From 7dbae32db39b76b7405f0ea904bb0c687e2d33b8 Mon Sep 17 00:00:00 2001 From: Dvir Yosef Date: Mon, 23 Dec 2024 17:00:13 +0200 Subject: [PATCH] feat(sequencing): add client to cende --- Cargo.lock | 5 ++ config/sequencer/default_config.json | 10 +++ .../papyrus_consensus_orchestrator/Cargo.toml | 5 +- .../src/cende/mod.rs | 82 ++++++++++++++++--- .../starknet_consensus_manager/src/config.rs | 8 +- .../src/consensus_manager.rs | 2 +- crates/starknet_integration_tests/Cargo.toml | 1 + .../src/config_utils.rs | 1 + .../starknet_integration_tests/src/utils.rs | 4 + crates/starknet_sequencer_node/Cargo.toml | 3 +- .../src/config/node_config.rs | 8 ++ .../src/config/test_utils.rs | 3 + 12 files changed, 115 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a9e185d4d..458989008c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7404,11 +7404,13 @@ dependencies = [ "infra_utils", "lazy_static", "mockall", + "papyrus_config", "papyrus_consensus", "papyrus_network", "papyrus_protobuf", "papyrus_storage", "papyrus_test_utils", + "reqwest 0.11.27", "serde", "serde_json", "starknet-types-core", @@ -7418,6 +7420,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "url", ] [[package]] @@ -10542,6 +10545,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "url", ] [[package]] @@ -10759,6 +10763,7 @@ dependencies = [ "starknet_state_sync_types", "tokio", "tracing", + "url", "validator", ] diff --git a/config/sequencer/default_config.json b/config/sequencer/default_config.json index 0d3d8ef4d6..c6aaf6bcbb 100644 --- a/config/sequencer/default_config.json +++ b/config/sequencer/default_config.json @@ -609,6 +609,11 @@ "privacy": "Public", "value": "0.0.0.0:8080" }, + "consensus_manager_config.cende_config.recorder_url": { + "description": "The URL of the Pythonic cende_recorder", + "pointer_target": "recorder_url", + "privacy": "Private" + }, "consensus_manager_config.consensus_config.chain_id": { "description": "The chain id of the Starknet chain.", "pointer_target": "chain_id", @@ -949,6 +954,11 @@ "privacy": "Public", "value": 8082 }, + "recorder_url": { + "description": "A required param! The URL of the Pythonic cende_recorder", + "param_type": "String", + "privacy": "TemporaryValue" + }, "rpc_state_reader_config.json_rpc_version": { "description": "The json rpc version.", "privacy": "Public", diff --git a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml index b6f66304b2..84b5c3f7e4 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml +++ b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml @@ -16,13 +16,12 @@ papyrus_consensus.workspace = true papyrus_network.workspace = true papyrus_protobuf.workspace = true papyrus_storage.workspace = true -serde.workspace = true -starknet-types-core.workspace = true +reqwest.workspace = true starknet_api.workspace = true -starknet_batcher_types = { workspace = true, features = ["testing"] } tokio = { workspace = true, features = ["full"] } tokio-util.workspace = true tracing.workspace = true +url = { workspace = true, features = ["serde"] } [dev-dependencies] infra_utils.workspace = true diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs index 94dbbf15c0..b6ce0e1c15 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs @@ -1,20 +1,26 @@ mod central_objects; +use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; use futures::channel::oneshot; #[cfg(test)] use mockall::automock; +use papyrus_config::dumping::{ser_param, SerializeConfig}; +use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; use starknet_api::block::BlockNumber; use tokio::sync::Mutex; use tokio::task::{self}; use tracing::debug; +use url::Url; // TODO(dvir): add tests when will have more logic. /// A chunk of all the data to write to Aersopike. -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct AerospikeBlob { // TODO(yael, dvir): add the blob fields. } @@ -31,17 +37,58 @@ pub trait CendeContext: Send + Sync { async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters); } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct CendeAmbassador { // TODO(dvir): consider creating enum varaiant instead of the `Option`. // `None` indicates that there is no blob to write, and therefore, the node can't be the // proposer. prev_height_blob: Arc>>, + url: Url, + client: Client, } +/// The path to write blob in the Recorder. +const RECORDER_WRITE_BLOB_PATH: &str = "/write_blob"; + impl CendeAmbassador { - pub fn new() -> Self { - CendeAmbassador { prev_height_blob: Arc::new(Mutex::new(None)) } + pub fn new(cende_config: CendeConfig) -> Self { + CendeAmbassador { + prev_height_blob: Arc::new(Mutex::new(None)), + url: cende_config + .recorder_url + .join(RECORDER_WRITE_BLOB_PATH) + .expect("Failed to join `RECORDER_WRITE_BLOB_PATH` with the Recorder URL"), + client: Client::new(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct CendeConfig { + pub recorder_url: Url, +} + +impl Default for CendeConfig { + fn default() -> Self { + CendeConfig { + // TODO(dvir): change this default value to "https://". The reason for the + // current value is to make the `end_to_end_flow_test` to pass (it creates the default + // config). + recorder_url: "https://recorder_url" + .parse() + .expect("recorder_url must be a valid Recorder URL"), + } + } +} + +impl SerializeConfig for CendeConfig { + fn dump(&self) -> BTreeMap { + BTreeMap::from_iter([ser_param( + "recorder_url", + &self.recorder_url, + "The URL of the Pythonic cende_recorder", + ParamPrivacyInput::Private, + )]) } } @@ -50,13 +97,14 @@ impl CendeContext for CendeAmbassador { fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); let prev_height_blob = self.prev_height_blob.clone(); + let request_builder = self.client.post(self.url.clone()); task::spawn(async move { // TODO(dvir): remove this when handle the booting up case. // Heights that are permitted to be built without writing to Aerospike. // Height 1 to make `end_to_end_flow` test pass. - const PERMITTED_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)]; + const NO_WRITE_REQUIRED_HEIGHTS: [BlockNumber; 1] = [BlockNumber(1)]; - if PERMITTED_HEIGHTS.contains(&height) { + if NO_WRITE_REQUIRED_HEIGHTS.contains(&height) { debug!( "height {} is in `PERMITTED_HEIGHTS`, consensus can send proposal without \ writing to Aerospike", @@ -65,16 +113,30 @@ impl CendeContext for CendeAmbassador { sender.send(true).unwrap(); return; } - let Some(ref _blob) = *prev_height_blob.lock().await else { + let Some(ref blob) = *prev_height_blob.lock().await else { debug!("No blob to write to Aerospike."); sender.send(false).expect("Writing to a one-shot sender should succeed."); return; }; - // TODO(dvir): write blob to AS. // TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS. debug!("Writing blob to Aerospike."); - sender.send(true).expect("Writing to a one-shot sender should succeed."); - debug!("Blob writing to Aerospike completed."); + match request_builder.json(blob).send().await { + Ok(response) => { + if response.status().is_success() { + debug!("Blob written to Aerospike successfully."); + sender.send(true).expect("Writing to a one-shot sender should succeed."); + } else { + debug!("Failed to write blob to Aerospike. Status: {}", response.status()); + sender.send(false).expect("Writing to a one-shot sender should succeed."); + } + } + Err(err) => { + debug!("Failed to write blob to Aerospike. Error: {}", err); + // TODO(dvir): change this to `false`. The reason for the current value is to + // make the `end_to_end_flow_test` to pass. + sender.send(true).expect("Writing to a one-shot sender should succeed."); + } + } }); receiver diff --git a/crates/starknet_consensus_manager/src/config.rs b/crates/starknet_consensus_manager/src/config.rs index 84ca95dfd2..58c1ceafbb 100644 --- a/crates/starknet_consensus_manager/src/config.rs +++ b/crates/starknet_consensus_manager/src/config.rs @@ -3,6 +3,7 @@ use std::collections::BTreeMap; use papyrus_config::dumping::{append_sub_config_name, SerializeConfig}; use papyrus_config::{ParamPath, SerializedParam}; use papyrus_consensus::config::ConsensusConfig; +use papyrus_consensus_orchestrator::cende::CendeConfig; use serde::{Deserialize, Serialize}; use validator::Validate; @@ -11,12 +12,15 @@ use validator::Validate; #[derive(Clone, Default, Debug, Serialize, Deserialize, Validate, PartialEq)] pub struct ConsensusManagerConfig { pub consensus_config: ConsensusConfig, + pub cende_config: CendeConfig, } impl SerializeConfig for ConsensusManagerConfig { fn dump(&self) -> BTreeMap { - let sub_configs = - vec![append_sub_config_name(self.consensus_config.dump(), "consensus_config")]; + let sub_configs = vec![ + append_sub_config_name(self.consensus_config.dump(), "consensus_config"), + append_sub_config_name(self.cende_config.dump(), "cende_config"), + ]; sub_configs.into_iter().flatten().collect() } diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index b5fcf4bdec..af463e2038 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -86,7 +86,7 @@ impl ConsensusManager { votes_broadcast_channels.broadcast_topic_client.clone(), self.config.consensus_config.num_validators, self.config.consensus_config.chain_id.clone(), - Arc::new(CendeAmbassador::new()), + Arc::new(CendeAmbassador::new(self.config.cende_config.clone())), ); let mut network_handle = tokio::task::spawn(network_manager.run()); diff --git a/crates/starknet_integration_tests/Cargo.toml b/crates/starknet_integration_tests/Cargo.toml index 7743194f5d..546754cb30 100644 --- a/crates/starknet_integration_tests/Cargo.toml +++ b/crates/starknet_integration_tests/Cargo.toml @@ -42,6 +42,7 @@ strum.workspace = true tempfile.workspace = true tokio.workspace = true tracing.workspace = true +url.workspace = true [dev-dependencies] futures.workspace = true diff --git a/crates/starknet_integration_tests/src/config_utils.rs b/crates/starknet_integration_tests/src/config_utils.rs index 82dcd36700..c4386833c8 100644 --- a/crates/starknet_integration_tests/src/config_utils.rs +++ b/crates/starknet_integration_tests/src/config_utils.rs @@ -53,6 +53,7 @@ pub(crate) fn dump_config_file_changes( required_params.eth_fee_token_address, required_params.strk_fee_token_address, required_params.validator_id, + required_params.recorder_url, ); // Create the entire mapping of the config and the pointers, without the required params. diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index f05a6793e3..118840083e 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -37,6 +37,7 @@ use starknet_sequencer_node::config::node_config::SequencerNodeConfig; use starknet_sequencer_node::config::test_utils::RequiredParams; use starknet_state_sync::config::StateSyncConfig; use starknet_types_core::felt::Felt; +use url::Url; pub fn create_chain_info() -> ChainInfo { let mut chain_info = ChainInfo::create_for_testing(); @@ -89,6 +90,8 @@ pub async fn create_config( eth_fee_token_address: fee_token_addresses.eth_fee_token_address, strk_fee_token_address: fee_token_addresses.strk_fee_token_address, validator_id, + // TODO(dvir): change this to real value when add recorder to integration tests. + recorder_url: Url::parse("https://recorder_url").expect("The URL is valid"), }, ) } @@ -126,6 +129,7 @@ pub fn create_consensus_manager_configs_and_channels( timeouts: timeouts.clone(), ..Default::default() }, + ..Default::default() }) .collect(); diff --git a/crates/starknet_sequencer_node/Cargo.toml b/crates/starknet_sequencer_node/Cargo.toml index 636b021f31..2d29cabd4e 100644 --- a/crates/starknet_sequencer_node/Cargo.toml +++ b/crates/starknet_sequencer_node/Cargo.toml @@ -6,7 +6,7 @@ repository.workspace = true license.workspace = true [features] -testing = ["papyrus_proc_macros"] +testing = ["papyrus_proc_macros", "url"] [lints] workspace = true @@ -42,6 +42,7 @@ starknet_state_sync.workspace = true starknet_state_sync_types.workspace = true tokio.workspace = true tracing.workspace = true +url = { workspace = true, optional = true } validator.workspace = true [dev-dependencies] diff --git a/crates/starknet_sequencer_node/src/config/node_config.rs b/crates/starknet_sequencer_node/src/config/node_config.rs index d8a289ad80..f7d67e837a 100644 --- a/crates/starknet_sequencer_node/src/config/node_config.rs +++ b/crates/starknet_sequencer_node/src/config/node_config.rs @@ -89,6 +89,14 @@ pub static CONFIG_POINTERS: LazyLock = LazyLock::new(|| { ), set_pointing_param_paths(&["consensus_manager_config.consensus_config.validator_id"]), ), + ( + ser_pointer_target_required_param( + "recorder_url", + SerializationType::String, + "The URL of the Pythonic cende_recorder", + ), + set_pointing_param_paths(&["consensus_manager_config.cende_config.recorder_url"]), + ), ]; let mut common_execution_config = generate_struct_pointer( "versioned_constants_overrides".to_owned(), diff --git a/crates/starknet_sequencer_node/src/config/test_utils.rs b/crates/starknet_sequencer_node/src/config/test_utils.rs index e25a95a6cf..6c351f4f83 100644 --- a/crates/starknet_sequencer_node/src/config/test_utils.rs +++ b/crates/starknet_sequencer_node/src/config/test_utils.rs @@ -3,6 +3,7 @@ use std::vec::Vec; // Used by #[gen_field_names_fn]. use papyrus_proc_macros::gen_field_names_and_cli_args_fn; use papyrus_protobuf::consensus::DEFAULT_VALIDATOR_ID; use starknet_api::core::{ChainId, ContractAddress}; +use url::Url; use crate::config::node_config::node_command; @@ -13,6 +14,7 @@ pub struct RequiredParams { pub eth_fee_token_address: ContractAddress, pub strk_fee_token_address: ContractAddress, pub validator_id: ContractAddress, + pub recorder_url: Url, } impl RequiredParams { @@ -22,6 +24,7 @@ impl RequiredParams { eth_fee_token_address: ContractAddress::from(2_u128), strk_fee_token_address: ContractAddress::from(3_u128), validator_id: ContractAddress::from(DEFAULT_VALIDATOR_ID), + recorder_url: Url::parse("https://recorder_url").expect("The URL is valid"), } } }