Skip to content

Commit

Permalink
chore: synchonizer uses typed rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
nhtyy committed Jun 26, 2024
1 parent 3f46d54 commit 34cf2c6
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 236 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6870,6 +6870,7 @@ dependencies = [
"humantime-serde",
"lightning-interfaces",
"lightning-metrics",
"lightning-rpc",
"lightning-utils",
"rand",
"reqwest",
Expand Down
2 changes: 1 addition & 1 deletion core/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ pub mod config;
pub mod error;
mod logic;
mod server;
pub mod utils;
pub use server::create_hmac;

#[cfg(test)]
mod tests;

Expand Down
58 changes: 0 additions & 58 deletions core/rpc/src/utils.rs

This file was deleted.

1 change: 1 addition & 0 deletions core/syncronizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true
lightning-interfaces = { path = "../interfaces" }
lightning-utils = { path = "../utils" }
lightning-metrics = { path = "../metrics" }
lightning-rpc = { path = "../rpc" }
futures.workspace = true
anyhow.workspace = true
tracing.workspace = true
Expand Down
244 changes: 113 additions & 131 deletions core/syncronizer/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,13 @@
use std::future::Future;
use std::net::IpAddr;

use anyhow::{anyhow, Result};
use fleek_crypto::NodePublicKey;
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use lightning_interfaces::types::{Epoch, EpochInfo, NodeIndex, NodeInfo};
use serde::de::DeserializeOwned;
use lightning_rpc::{Fleek, RpcClient};
use tokio::runtime::Handle;

pub async fn rpc_request<T: DeserializeOwned>(
client: &reqwest::Client,
ip: IpAddr,
port: u16,
req: String,
) -> Result<RpcResponse<T>> {
let res = client
.post(format!("http://{ip}:{port}/rpc/v0"))
.header("Content-Type", "application/json")
.body(req)
.send()
.await?;
if res.status().is_success() {
let value: serde_json::Value = res.json().await?;
if value.get("result").is_some() {
let value: RpcResponse<T> = serde_json::from_value(value)?;
Ok(value)
} else {
Err(anyhow!("Failed to parse response"))
}
} else {
Err(anyhow!("Request failed with status: {}", res.status()))
}
}

pub async fn ask_nodes<T: DeserializeOwned>(
req: String,
nodes: &[(NodeIndex, NodeInfo)],
rpc_client: &reqwest::Client,
) -> Result<Vec<T>> {
let mut futs = Vec::new();
for (_, node) in nodes {
let req_clone = req.clone();
let fut = async move {
rpc_request::<T>(rpc_client, node.domain, node.ports.rpc, req_clone)
.await
.ok()
};
futs.push(fut);
}

let results: Vec<T> = futures::future::join_all(futs)
.await
.into_iter()
.flatten()
.map(|x| x.result)
.collect();

if results.is_empty() {
Err(anyhow!("Unable to get a response from nodes"))
} else {
Ok(results)
}
}

/// Runs the given future to completion on the current tokio runtime.
/// This call is intentionally blocking.
pub fn sync_call<F>(fut: F) -> F::Output
Expand All @@ -76,51 +22,78 @@ where
}

/// Returns the epoch info from the epoch the bootstrap nodes are on
pub async fn get_epoch_info(
nodes: Vec<(NodeIndex, NodeInfo)>,
rpc_client: reqwest::Client,
) -> Result<EpochInfo> {
let mut epochs: Vec<EpochInfo> =
ask_nodes(rpc_epoch_info().to_string(), &nodes, &rpc_client).await?;
pub async fn get_epoch_info(nodes: Vec<(NodeIndex, NodeInfo)>) -> Result<EpochInfo> {
let mut epochs = ask_epoch_info(nodes).await;

if epochs.is_empty() {
return Err(anyhow!("Failed to get epoch info from bootstrap nodes"));
}
epochs.sort_by(|a, b| a.epoch.partial_cmp(&b.epoch).unwrap());
Ok(epochs.pop().unwrap())
}

/// A list of the nodes reported epoch info
///
/// ### Empty if all the requests fail
pub async fn ask_epoch_info(nodes: Vec<(NodeIndex, NodeInfo)>) -> Vec<EpochInfo> {
nodes
.into_iter()
.map(|(_, node)| async move {
let client =
RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc))
.ok()?;

client.get_epoch_info().await.ok()
})
.collect::<FuturesOrdered<_>>()
.filter_map(std::future::ready)
.collect::<Vec<_>>()
.await
}

/// Returns the node info for our node, if it's already on the state.
pub async fn get_node_info(
node_public_key: NodePublicKey,
nodes: Vec<(NodeIndex, NodeInfo)>,
rpc_client: reqwest::Client,
) -> Result<Option<NodeInfo>> {
let mut node_info: Vec<(Option<NodeInfo>, Epoch)> = ask_nodes(
rpc_node_info(&node_public_key).to_string(),
&nodes,
&rpc_client,
)
.await?;
let mut node_info = ask_node_info(nodes, node_public_key).await;

if node_info.is_empty() {
return Err(anyhow!("Failed to get node info from bootstrap nodes"));
}

node_info.sort_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap());
Ok(node_info.pop().unwrap().0)
}

/// A list of the nodes reported node info
///
/// ### Empty if all the requests fail
pub async fn ask_node_info(
nodes: Vec<(NodeIndex, NodeInfo)>,
pk: NodePublicKey,
) -> Vec<(Option<NodeInfo>, Epoch)> {
nodes
.into_iter()
.map(|(_, node)| async move {
let client =
RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc))
.ok()?;

client.get_node_info_epoch(pk).await.ok()
})
.collect::<FuturesOrdered<_>>()
.filter_map(std::future::ready)
.collect::<Vec<_>>()
.await
}

/// Returns the node info for our node, if it's already on the state.
pub async fn check_is_valid_node(
node_public_key: NodePublicKey,
nodes: Vec<(NodeIndex, NodeInfo)>,
rpc_client: reqwest::Client,
) -> Result<bool> {
let mut is_valid: Vec<(bool, Epoch)> = ask_nodes(
rpc_is_valid_node(&node_public_key).to_string(),
&nodes,
&rpc_client,
)
.await?;
let mut is_valid = ask_is_valid_node(nodes, node_public_key).await;

if is_valid.is_empty() {
return Err(anyhow!("Failed to get node validity from bootstrap nodes"));
Expand All @@ -129,13 +102,31 @@ pub async fn check_is_valid_node(
Ok(is_valid.pop().unwrap().0)
}

/// Ask the nodes if the given node is valid
///
/// ### Empty if all the requests fail
pub async fn ask_is_valid_node(
nodes: Vec<(NodeIndex, NodeInfo)>,
pk: NodePublicKey,
) -> Vec<(bool, Epoch)> {
nodes
.into_iter()
.map(|(_, node)| async move {
let client =
RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc))
.ok()?;

client.is_valid_node_epoch(pk).await.ok()
})
.collect::<FuturesOrdered<_>>()
.filter_map(std::future::ready)
.collect::<Vec<_>>()
.await
}

/// Returns the hash of the last epoch ckpt, and the current epoch.
pub async fn last_epoch_hash(
nodes: &[(NodeIndex, NodeInfo)],
rpc_client: &reqwest::Client,
) -> Result<[u8; 32]> {
let mut hash: Vec<([u8; 32], Epoch)> =
ask_nodes(rpc_last_epoch_hash().to_string(), nodes, rpc_client).await?;
pub async fn last_epoch_hash(nodes: &[(NodeIndex, NodeInfo)]) -> Result<[u8; 32]> {
let mut hash = ask_last_epoch_hash(nodes.to_vec()).await;

if hash.is_empty() {
return Err(anyhow!(
Expand All @@ -146,55 +137,46 @@ pub async fn last_epoch_hash(
Ok(hash.pop().unwrap().0)
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct RpcResponse<T> {
pub jsonrpc: String,
pub id: usize,
pub result: T,
}

// todo(dalton): Lazy static?
pub fn rpc_last_epoch_hash() -> serde_json::Value {
serde_json::json!({
"jsonrpc": "2.0",
"method":"flk_get_last_epoch_hash",
"params":[],
"id":1,
})
}

pub fn rpc_epoch() -> serde_json::Value {
serde_json::json!({
"jsonrpc": "2.0",
"method":"flk_get_epoch",
"params":[],
"id":1,
})
}

pub fn rpc_epoch_info() -> serde_json::Value {
serde_json::json!({
"jsonrpc": "2.0",
"method":"flk_get_epoch_info",
"params":[],
"id":1,
})
/// A list of the nodes reported last epoch hash
///
/// ### Empty if all the requests fail
pub async fn ask_last_epoch_hash(nodes: Vec<(NodeIndex, NodeInfo)>) -> Vec<([u8; 32], Epoch)> {
nodes
.into_iter()
.map(|(_, node)| async move {
let client =
RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc))
.ok()?;

client.get_last_epoch_hash().await.ok()
})
.collect::<FuturesOrdered<_>>()
.filter_map(std::future::ready)
.collect::<Vec<_>>()
.await
}

pub fn rpc_node_info(public_key: &NodePublicKey) -> serde_json::Value {
serde_json::json!({
"jsonrpc": "2.0",
"method":"flk_get_node_info_epoch",
"params":{"public_key": public_key},
"id":1,
})
}
/// Returns the epoch info from the epoch the bootstrap nodes are on
///
/// ### Empty if all the requests fail
pub async fn get_epoch(nodes: Vec<(NodeIndex, NodeInfo)>) -> Result<Vec<Epoch>> {
let results = nodes
.into_iter()
.map(|(_, node)| async move {
let client =
RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc))
.ok()?;

client.get_epoch().await.ok()
})
.collect::<FuturesOrdered<_>>()
.filter_map(std::future::ready)
.collect::<Vec<_>>()
.await;

pub fn rpc_is_valid_node(public_key: &NodePublicKey) -> serde_json::Value {
serde_json::json!({
"jsonrpc": "2.0",
"method":"flk_is_valid_node_epoch",
"params":{"public_key": public_key},
"id":1,
})
if results.is_empty() {
Err(anyhow!("Unable to get a response from nodes"))
} else {
Ok(results)
}
}
Loading

0 comments on commit 34cf2c6

Please sign in to comment.