Skip to content

Commit

Permalink
feat(sequencing): add client to cende
Browse files Browse the repository at this point in the history
  • Loading branch information
DvirYo-starkware committed Dec 23, 2024
1 parent 64f4a21 commit b6be2f8
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 11 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions config/sequencer/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,11 @@
"privacy": "Public",
"value": "0.0.0.0:8080"
},
"consensus_manager_config.cende_config.recorder_url": {
"description": "A required param! The URL of the Pythonic cende_recorder",
"param_type": "String",
"privacy": "Private"
},
"consensus_manager_config.consensus_config.chain_id": {
"description": "The chain id of the Starknet chain.",
"pointer_target": "chain_id",
Expand Down
4 changes: 4 additions & 0 deletions crates/sequencing/papyrus_consensus_orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ description = "Implements the consensus context and orchestrates the node's comp
async-trait.workspace = true
chrono.workspace = true
futures.workspace = true
papyrus_config.workspace = true
papyrus_consensus.workspace = true
papyrus_network.workspace = true
papyrus_protobuf.workspace = true
papyrus_storage.workspace = true
reqwest.workspace =true
starknet-types-core.workspace = true
starknet_api.workspace = true
starknet_batcher_types = { workspace = true, features = ["testing"] }
serde.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util.workspace = true
tracing.workspace = true
url = { workspace = true, features = ["serde"] }

[dev-dependencies]
lazy_static.workspace = true
Expand Down
78 changes: 70 additions & 8 deletions crates/sequencing/papyrus_consensus_orchestrator/src/cende/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use async_trait::async_trait;
use futures::channel::oneshot;
#[cfg(test)]
use mockall::automock;
use papyrus_config::dumping::{ser_required_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializationType, SerializedParam};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use tokio::sync::Mutex;
use tokio::task::{self};
use tracing::debug;
use url::Url;

// TODO(dvir): add tests when will have more logic.

/// A chunk of all the data to write to Aersopike.
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct AerospikeBlob {
// TODO(yael, dvir): add the blob fields.
}
Expand All @@ -29,17 +35,58 @@ pub trait CendeContext: Send + Sync {
async fn prepare_blob_for_next_height(&self, blob_parameters: BlobParameters);
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug)]
pub struct CendeAmbassador {
// TODO(dvir): consider creating enum varaiant instead of the `Option<AerospikeBlob>`.
// `None` indicates that there is no blob to write, and therefore, the node can't be the
// proposer.
prev_height_blob: Arc<Mutex<Option<AerospikeBlob>>>,
url: Url,
client: Client,
}

/// The path to write blob in the Recorder.
const RECORDER_WRITE_BLOB_PATH: &str = "/write_blob";

impl CendeAmbassador {
pub fn new() -> Self {
CendeAmbassador { prev_height_blob: Arc::new(Mutex::new(None)) }
pub fn new(cende_config: CendeConfig) -> Self {
CendeAmbassador {
prev_height_blob: Arc::new(Mutex::new(None)),
url: cende_config
.recorder_url
.join(RECORDER_WRITE_BLOB_PATH)
.expect("Failed to join `RECORDER_WRITE_BLOB_PATH` with the Recorder URL"),
client: Client::new(),
}
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct CendeConfig {
pub recorder_url: Url,
}

impl Default for CendeConfig {
fn default() -> Self {
CendeConfig {
// TODO(dvir): change this default value to "https://<recorder_url>". The reason for the
// current value is to make the `end_to_end_flow_test` to pass (it creates the default
// config).
recorder_url: "https://recorder_url"
.parse()
.expect("recorder_url must be a valid Recorder URL"),
}
}
}

impl SerializeConfig for CendeConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([ser_required_param(
"recorder_url",
SerializationType::String,
"The URL of the Pythonic cende_recorder",
ParamPrivacyInput::Private,
)])
}
}

Expand All @@ -48,6 +95,7 @@ impl CendeContext for CendeAmbassador {
fn write_prev_height_blob(&self, height: BlockNumber) -> oneshot::Receiver<bool> {
let (sender, receiver) = oneshot::channel();
let prev_height_blob = self.prev_height_blob.clone();
let request_builder = self.client.post(self.url.clone());
task::spawn(async move {
// TODO(dvir): remove this when handle the booting up case.
// Heights that are permitted to be built without writing to Aerospike.
Expand All @@ -63,16 +111,30 @@ impl CendeContext for CendeAmbassador {
sender.send(true).unwrap();
return;
}
let Some(ref _blob) = *prev_height_blob.lock().await else {
let Some(ref blob) = *prev_height_blob.lock().await else {
debug!("No blob to write to Aerospike.");
sender.send(false).expect("Writing to a one-shot sender should succeed.");
return;
};
// TODO(dvir): write blob to AS.
// TODO(dvir): consider set `prev_height_blob` to `None` after writing to AS.
debug!("Writing blob to Aerospike.");
sender.send(true).expect("Writing to a one-shot sender should succeed.");
debug!("Blob writing to Aerospike completed.");
match request_builder.json(blob).send().await {
Ok(response) => {
if response.status().is_success() {
debug!("Blob written to Aerospike successfully.");
sender.send(true).expect("Writing to a one-shot sender should succeed.");
} else {
debug!("Failed to write blob to Aerospike. Status: {}", response.status());
sender.send(false).expect("Writing to a one-shot sender should succeed.");
}
}
Err(err) => {
debug!("Failed to write blob to Aerospike. Error: {}", err);
// TODO(dvir): change this to `false`. The reason for the current value is to
// make the `end_to_end_flow_test` to pass.
sender.send(true).expect("Writing to a one-shot sender should succeed.");
}
}
});

receiver
Expand Down
8 changes: 6 additions & 2 deletions crates/starknet_consensus_manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::BTreeMap;
use papyrus_config::dumping::{append_sub_config_name, SerializeConfig};
use papyrus_config::{ParamPath, SerializedParam};
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus_orchestrator::cende::CendeConfig;
use serde::{Deserialize, Serialize};
use validator::Validate;

Expand All @@ -11,12 +12,15 @@ use validator::Validate;
#[derive(Clone, Default, Debug, Serialize, Deserialize, Validate, PartialEq)]
pub struct ConsensusManagerConfig {
pub consensus_config: ConsensusConfig,
pub cende_config: CendeConfig,
}

impl SerializeConfig for ConsensusManagerConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
let sub_configs =
vec![append_sub_config_name(self.consensus_config.dump(), "consensus_config")];
let sub_configs = vec![
append_sub_config_name(self.consensus_config.dump(), "consensus_config"),
append_sub_config_name(self.cende_config.dump(), "cende_config"),
];

sub_configs.into_iter().flatten().collect()
}
Expand Down
2 changes: 1 addition & 1 deletion crates/starknet_consensus_manager/src/consensus_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl ConsensusManager {
votes_broadcast_channels.broadcast_topic_client.clone(),
self.config.consensus_config.num_validators,
self.config.consensus_config.chain_id.clone(),
Arc::new(CendeAmbassador::new()),
Arc::new(CendeAmbassador::new(self.config.cende_config.clone())),
);

let mut network_handle = tokio::task::spawn(network_manager.run());
Expand Down
1 change: 1 addition & 0 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ pub fn create_consensus_manager_configs_and_channels(
timeouts: timeouts.clone(),
..Default::default()
},
..Default::default()
})
.collect();

Expand Down

0 comments on commit b6be2f8

Please sign in to comment.