From b6be2f8b0ecd06e0390a7acec01ea95149b29bb5 Mon Sep 17 00:00:00 2001 From: Dvir Yosef Date: Mon, 23 Dec 2024 17:00:13 +0200 Subject: [PATCH] feat(sequencing): add client to cende --- Cargo.lock | 4 + config/sequencer/default_config.json | 5 ++ .../papyrus_consensus_orchestrator/Cargo.toml | 4 + .../src/cende/mod.rs | 78 +++++++++++++++++-- .../starknet_consensus_manager/src/config.rs | 8 +- .../src/consensus_manager.rs | 2 +- .../starknet_integration_tests/src/utils.rs | 1 + 7 files changed, 91 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f15bb5903d..481f4bff38f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7399,11 +7399,14 @@ dependencies = [ "futures", "lazy_static", "mockall", + "papyrus_config", "papyrus_consensus", "papyrus_network", "papyrus_protobuf", "papyrus_storage", "papyrus_test_utils", + "reqwest 0.11.27", + "serde", "starknet-types-core", "starknet_api", "starknet_batcher_types", @@ -7411,6 +7414,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "url", ] [[package]] diff --git a/config/sequencer/default_config.json b/config/sequencer/default_config.json index 0d3d8ef4d64..7ae64958cd7 100644 --- a/config/sequencer/default_config.json +++ b/config/sequencer/default_config.json @@ -609,6 +609,11 @@ "privacy": "Public", "value": "0.0.0.0:8080" }, + "consensus_manager_config.cende_config.recorder_url": { + "description": "A required param! The URL of the Pythonic cende_recorder", + "param_type": "String", + "privacy": "Private" + }, "consensus_manager_config.consensus_config.chain_id": { "description": "The chain id of the Starknet chain.", "pointer_target": "chain_id", diff --git a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml index 172f59af145..6d953ae8921 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml +++ b/crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml @@ -10,16 +10,20 @@ description = "Implements the consensus context and orchestrates the node's comp async-trait.workspace = true chrono.workspace = true futures.workspace = true +papyrus_config.workspace = true papyrus_consensus.workspace = true papyrus_network.workspace = true papyrus_protobuf.workspace = true papyrus_storage.workspace = true +reqwest.workspace =true starknet-types-core.workspace = true starknet_api.workspace = true starknet_batcher_types = { workspace = true, features = ["testing"] } +serde.workspace = true tokio = { workspace = true, features = ["full"] } tokio-util.workspace = true tracing.workspace = true +url = { workspace = true, features = ["serde"] } [dev-dependencies] lazy_static.workspace = true diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs index 4fc701ced57..e79eec207b4 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs @@ -1,18 +1,24 @@ +use std::collections::BTreeMap; use std::sync::Arc; use async_trait::async_trait; use futures::channel::oneshot; #[cfg(test)] use mockall::automock; +use papyrus_config::dumping::{ser_required_param, SerializeConfig}; +use papyrus_config::{ParamPath, ParamPrivacyInput, SerializationType, SerializedParam}; +use reqwest::Client; +use serde::{Deserialize, Serialize}; use starknet_api::block::BlockNumber; use tokio::sync::Mutex; use tokio::task::{self}; use tracing::debug; +use url::Url; // TODO(dvir): add tests when will have more logic. /// A chunk of all the data to write to Aersopike. -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct AerospikeBlob { // TODO(yael, dvir): add the blob fields. } @@ -29,17 +35,58 @@ pub trait CendeContext: Send + Sync { async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters); } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct CendeAmbassador { // TODO(dvir): consider creating enum varaiant instead of the `Option`. // `None` indicates that there is no blob to write, and therefore, the node can't be the // proposer. prev_height_blob: Arc>>, + url: Url, + client: Client, } +/// The path to write blob in the Recorder. +const RECORDER_WRITE_BLOB_PATH: &str = "/write_blob"; + impl CendeAmbassador { - pub fn new() -> Self { - CendeAmbassador { prev_height_blob: Arc::new(Mutex::new(None)) } + pub fn new(cende_config: CendeConfig) -> Self { + CendeAmbassador { + prev_height_blob: Arc::new(Mutex::new(None)), + url: cende_config + .recorder_url + .join(RECORDER_WRITE_BLOB_PATH) + .expect("Failed to join `RECORDER_WRITE_BLOB_PATH` with the Recorder URL"), + client: Client::new(), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct CendeConfig { + pub recorder_url: Url, +} + +impl Default for CendeConfig { + fn default() -> Self { + CendeConfig { + // TODO(dvir): change this default value to "https://". The reason for the + // current value is to make the `end_to_end_flow_test` to pass (it creates the default + // config). + recorder_url: "https://recorder_url" + .parse() + .expect("recorder_url must be a valid Recorder URL"), + } + } +} + +impl SerializeConfig for CendeConfig { + fn dump(&self) -> BTreeMap { + BTreeMap::from_iter([ser_required_param( + "recorder_url", + SerializationType::String, + "The URL of the Pythonic cende_recorder", + ParamPrivacyInput::Private, + )]) } } @@ -48,6 +95,7 @@ impl CendeContext for CendeAmbassador { fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver { let (sender, receiver) = oneshot::channel(); let prev_height_blob = self.prev_height_blob.clone(); + let request_builder = self.client.post(self.url.clone()); task::spawn(async move { // TODO(dvir): remove this when handle the booting up case. // Heights that are permitted to be built without writing to Aerospike. @@ -63,16 +111,30 @@ impl CendeContext for CendeAmbassador { sender.send(true).unwrap(); return; } - let Some(ref _blob) = *prev_height_blob.lock().await else { + let Some(ref blob) = *prev_height_blob.lock().await else { debug!("No blob to write to Aerospike."); sender.send(false).expect("Writing to a one-shot sender should succeed."); return; }; - // TODO(dvir): write blob to AS. // TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS. debug!("Writing blob to Aerospike."); - sender.send(true).expect("Writing to a one-shot sender should succeed."); - debug!("Blob writing to Aerospike completed."); + match request_builder.json(blob).send().await { + Ok(response) => { + if response.status().is_success() { + debug!("Blob written to Aerospike successfully."); + sender.send(true).expect("Writing to a one-shot sender should succeed."); + } else { + debug!("Failed to write blob to Aerospike. Status: {}", response.status()); + sender.send(false).expect("Writing to a one-shot sender should succeed."); + } + } + Err(err) => { + debug!("Failed to write blob to Aerospike. Error: {}", err); + // TODO(dvir): change this to `false`. The reason for the current value is to + // make the `end_to_end_flow_test` to pass. + sender.send(true).expect("Writing to a one-shot sender should succeed."); + } + } }); receiver diff --git a/crates/starknet_consensus_manager/src/config.rs b/crates/starknet_consensus_manager/src/config.rs index 84ca95dfd21..58c1ceafbbc 100644 --- a/crates/starknet_consensus_manager/src/config.rs +++ b/crates/starknet_consensus_manager/src/config.rs @@ -3,6 +3,7 @@ use std::collections::BTreeMap; use papyrus_config::dumping::{append_sub_config_name, SerializeConfig}; use papyrus_config::{ParamPath, SerializedParam}; use papyrus_consensus::config::ConsensusConfig; +use papyrus_consensus_orchestrator::cende::CendeConfig; use serde::{Deserialize, Serialize}; use validator::Validate; @@ -11,12 +12,15 @@ use validator::Validate; #[derive(Clone, Default, Debug, Serialize, Deserialize, Validate, PartialEq)] pub struct ConsensusManagerConfig { pub consensus_config: ConsensusConfig, + pub cende_config: CendeConfig, } impl SerializeConfig for ConsensusManagerConfig { fn dump(&self) -> BTreeMap { - let sub_configs = - vec![append_sub_config_name(self.consensus_config.dump(), "consensus_config")]; + let sub_configs = vec![ + append_sub_config_name(self.consensus_config.dump(), "consensus_config"), + append_sub_config_name(self.cende_config.dump(), "cende_config"), + ]; sub_configs.into_iter().flatten().collect() } diff --git a/crates/starknet_consensus_manager/src/consensus_manager.rs b/crates/starknet_consensus_manager/src/consensus_manager.rs index a40c502b5be..cdf6aa664bf 100644 --- a/crates/starknet_consensus_manager/src/consensus_manager.rs +++ b/crates/starknet_consensus_manager/src/consensus_manager.rs @@ -86,7 +86,7 @@ impl ConsensusManager { votes_broadcast_channels.broadcast_topic_client.clone(), self.config.consensus_config.num_validators, self.config.consensus_config.chain_id.clone(), - Arc::new(CendeAmbassador::new()), + Arc::new(CendeAmbassador::new(self.config.cende_config.clone())), ); let mut network_handle = tokio::task::spawn(network_manager.run()); diff --git a/crates/starknet_integration_tests/src/utils.rs b/crates/starknet_integration_tests/src/utils.rs index f05a6793e3b..8a3b597a873 100644 --- a/crates/starknet_integration_tests/src/utils.rs +++ b/crates/starknet_integration_tests/src/utils.rs @@ -126,6 +126,7 @@ pub fn create_consensus_manager_configs_and_channels( timeouts: timeouts.clone(), ..Default::default() }, + ..Default::default() }) .collect();