Skip to content

Commit

Permalink
replaced hyper with reqwest
Browse files Browse the repository at this point in the history
  • Loading branch information
idky137 committed Oct 8, 2024
1 parent 058a395 commit 666808a
Show file tree
Hide file tree
Showing 8 changed files with 337 additions and 132 deletions.
306 changes: 247 additions & 59 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ nym-bin-common = { git = "https://github.com/nymtech/nym", branch = "master" }
nym-sphinx-anonymous-replies = { git = "https://github.com/nymtech/nym", branch = "master" }

# Miscellaneous
tokio = { version = "1.37.0", features = ["full"] } # { version = "1.38", features = ["full"] }
tokio = { version = "=1.37.0", features = ["full"] } # { version = "1.38", features = ["full"] }
tonic = "0.10.2" # "0.12"
http = "0.2.4" # "1.1"
thiserror = "1.0.59" # "1.0"
Expand Down
5 changes: 2 additions & 3 deletions zaino-fetch/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ thiserror = { workspace = true }

# Miscellaneous Crate
prost = "0.12" # "0.13"
hyper = { version = "0.14.28", features = ["full"] } # { version = "1.4", features = ["full"] }
# reqwest = "0.12"
reqwest = "0.12"
url = "2.5"
serde_json = { version = "1.0.117", features = ["preserve_order"] } # { version = "1.0", features = ["preserve_order"] } # The preserve_order feature in serde_jsonn is a dependency of jsonrpc-core
serde = { version = "1.0.201", features = ["derive"] } # { version = "1.0", features = ["derive"] }
hyper-tls = "0.5" # "0.6"
hex = { version = "0.4.3", features = ["serde"] }
indexmap = { version = "2.2.6", features = ["serde"] }
base64 = "0.13.0" # "0.22"
Expand Down
2 changes: 1 addition & 1 deletion zaino-fetch/src/chain/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ pub async fn get_block_from_node(
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await;
.await?;
let block_1 = zebrad_client.get_block(height.to_string(), Some(1)).await;
match block_1 {
Ok(GetBlockResponse::Object {
Expand Down
7 changes: 3 additions & 4 deletions zaino-fetch/src/chain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use std::{collections::HashSet, time::SystemTime};
use tokio::sync::{Mutex, RwLock};

use crate::{
chain::error::MempoolError, jsonrpc::connector::JsonRpcConnector,
primitives::block::BlockHash,
chain::error::MempoolError, jsonrpc::connector::JsonRpcConnector, primitives::block::BlockHash,
};

/// Mempool state information.
Expand Down Expand Up @@ -58,7 +57,7 @@ impl Mempool {
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await
.await?
.get_raw_mempool()
.await?
.transactions;
Expand Down Expand Up @@ -90,7 +89,7 @@ impl Mempool {
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await
.await?
.get_blockchain_info()
.await?
.best_block_hash;
Expand Down
123 changes: 69 additions & 54 deletions zaino-fetch/src/jsonrpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
//! TODO: - Add option for http connector.

use http::Uri;
use hyper::{http, Body, Client, Request};
use hyper_tls::HttpsConnector;
// use reqwest::{Client, Url};
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::atomic::{AtomicI32, Ordering};
Expand Down Expand Up @@ -45,33 +43,43 @@ struct RpcError {
/// JsonRPC Client config data.
#[derive(Debug)]
pub struct JsonRpcConnector {
uri: http::Uri,
url: Url,
id_counter: AtomicI32,
user: Option<String>,
password: Option<String>,
}

impl JsonRpcConnector {
/// Returns a new JsonRpcConnector instance, tests uri and returns error if connection is not established.
pub async fn new(uri: http::Uri, user: Option<String>, password: Option<String>) -> Self {
Self {
uri,
pub async fn new(
uri: Uri,
user: Option<String>,
password: Option<String>,
) -> Result<Self, JsonRpcConnectorError> {
let url = reqwest::Url::parse(&uri.to_string())?;
Ok(Self {
url,
id_counter: AtomicI32::new(0),
user,
password,
}
})
}

/// Returns the http::uri the JsonRpcConnector is configured to send requests to.
pub fn uri(&self) -> Result<Uri, JsonRpcConnectorError> {
Ok(self.url.as_str().parse()?)
}

/// Returns the uri the JsonRpcConnector is configured to send requests to.
pub fn uri(&self) -> &Uri {
&self.uri
/// Returns the reqwest::url the JsonRpcConnector is configured to send requests to.
pub fn url(&self) -> Url {
self.url.clone()
}

/// Sends a jsonRPC request and returns the response.
///
/// TODO: This function currently resends the call up to 5 times on a server response of "Work queue depth exceeded".
/// This is because the node's queue can become overloaded and stop servicing RPCs.
/// This functionality is weak and should be incorporated in Zingo-Indexer's queue mechanism [WIP] that handles various errors appropriately.
/// This functionality is weak and should be incorporated in Zaino's queue mechanism [WIP] that handles various errors appropriately.
async fn send_request<T: Serialize, R: for<'de> Deserialize<'de>>(
&self,
method: &str,
Expand All @@ -88,30 +96,33 @@ impl JsonRpcConnector {
let mut attempts = 0;
loop {
attempts += 1;
let client = Client::builder().build(HttpsConnector::new());
let mut request_builder = Request::builder()
.method("POST")
.uri(self.uri.clone())
let client = Client::builder()
.connect_timeout(std::time::Duration::from_secs(2))
.timeout(std::time::Duration::from_secs(5))
.redirect(reqwest::redirect::Policy::none())
.build()?;

let mut request_builder = client
.post(self.url.clone())
.header("Content-Type", "application/json");
if let (Some(user), Some(password)) = (&self.user, &self.password) {
let auth = base64::encode(format!("{}:{}", user, password));
request_builder =
request_builder.header("Authorization", format!("Basic {}", auth));
request_builder = request_builder.basic_auth(user.clone(), Some(password.clone()));
}
let request_body =
serde_json::to_string(&req).map_err(JsonRpcConnectorError::SerdeJsonError)?;
let request = request_builder
.body(Body::from(request_body))
.map_err(JsonRpcConnectorError::HttpError)?;
let response = client
.request(request)
let response = request_builder
.body(request_body)
.send()
.await
.map_err(JsonRpcConnectorError::HyperError)?;
let body_bytes = hyper::body::to_bytes(response.into_body())
.await
.map_err(JsonRpcConnectorError::HyperError)?;
.map_err(JsonRpcConnectorError::ReqwestError)?;

let status = response.status();
let body_bytes = response
.bytes()
.await
.map_err(JsonRpcConnectorError::ReqwestError)?;
let body_str = String::from_utf8_lossy(&body_bytes);

if body_str.contains("Work queue depth exceeded") {
if attempts >= max_attempts {
return Err(JsonRpcConnectorError::new(
Expand All @@ -121,6 +132,13 @@ impl JsonRpcConnector {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}
if !status.is_success() {
return Err(JsonRpcConnectorError::new(format!(
"HTTP Error: {}",
status
)));
}

let response: RpcResponse<R> = serde_json::from_slice(&body_bytes)
.map_err(JsonRpcConnectorError::SerdeJsonError)?;
return match response.error {
Expand Down Expand Up @@ -369,32 +387,33 @@ impl JsonRpcConnector {

/// Tests connection with zebrad / zebrad.
async fn test_node_connection(
uri: Uri,
url: Url,
user: Option<String>,
password: Option<String>,
) -> Result<(), JsonRpcConnectorError> {
let client = Client::builder().build::<_, Body>(HttpsConnector::new());
let client = Client::builder()
.connect_timeout(std::time::Duration::from_secs(2))
.timeout(std::time::Duration::from_secs(5))
.redirect(reqwest::redirect::Policy::none())
.build()?;

let user = user.unwrap_or_else(|| "xxxxxx".to_string());
let password = password.unwrap_or_else(|| "xxxxxx".to_string());
let encoded_auth = base64::encode(format!("{}:{}", user, password));

let request = Request::builder()
.method("POST")
.uri(uri.clone())
let request_body = r#"{"jsonrpc":"2.0","method":"getinfo","params":[],"id":1}"#;
let mut request_builder = client
.post(url.clone())
.header("Content-Type", "application/json")
.header("Authorization", format!("Basic {}", encoded_auth))
.body(Body::from(
r#"{"jsonrpc":"2.0","method":"getinfo","params":[],"id":1}"#,
))
.map_err(JsonRpcConnectorError::HttpError)?;
let response =
tokio::time::timeout(tokio::time::Duration::from_secs(3), client.request(request))
.await
.map_err(JsonRpcConnectorError::TimeoutError)??;
let body_bytes = hyper::body::to_bytes(response.into_body())
.body(request_body);
request_builder = request_builder.basic_auth(user, Some(password)); // Used basic_auth method

let response = request_builder
.send()
.await
.map_err(JsonRpcConnectorError::ReqwestError)?;
let body_bytes = response
.bytes()
.await
.map_err(JsonRpcConnectorError::HyperError)?;
.map_err(JsonRpcConnectorError::ReqwestError)?;
let _response: RpcResponse<serde_json::Value> =
serde_json::from_slice(&body_bytes).map_err(JsonRpcConnectorError::SerdeJsonError)?;
Ok(())
Expand All @@ -406,24 +425,20 @@ pub async fn test_node_and_return_uri(
user: Option<String>,
password: Option<String>,
) -> Result<Uri, JsonRpcConnectorError> {
let ipv4_uri: Uri = format!("http://127.0.0.1:{}", port)
.parse()
.map_err(JsonRpcConnectorError::InvalidUriError)?;
let ipv6_uri: Uri = format!("http://[::1]:{}", port)
.parse()
.map_err(JsonRpcConnectorError::InvalidUriError)?;
let ipv4_uri: Url = format!("http://127.0.0.1:{}", port).parse()?;
let ipv6_uri: Url = format!("http://[::1]:{}", port).parse()?;
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
for _ in 0..3 {
match test_node_connection(ipv4_uri.clone(), user.clone(), password.clone()).await {
Ok(_) => {
println!("Connected to node using IPv4 at address {}.", ipv4_uri);
return Ok(ipv4_uri);
return Ok(ipv4_uri.as_str().parse()?);
}
Err(_e_ipv4) => {
match test_node_connection(ipv6_uri.clone(), user.clone(), password.clone()).await {
Ok(_) => {
println!("Connected to node using IPv6 at address {}.", ipv6_uri);
return Ok(ipv6_uri);
return Ok(ipv6_uri.as_str().parse()?);
}
Err(_e_ipv6) => {
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
Expand Down
10 changes: 7 additions & 3 deletions zaino-fetch/src/jsonrpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ pub enum JsonRpcConnectorError {
#[error("Serialization/Deserialization Error: {0}")]
SerdeJsonError(#[from] serde_json::Error),

/// HTTP Request Errors.
/// Reqwest Based Errors.
#[error("HTTP Request Error: {0}")]
HyperError(#[from] hyper::Error),
ReqwestError(#[from] reqwest::Error),

///HTTP Errors.
#[error("HTTP Error: {0}")]
Expand All @@ -23,6 +23,10 @@ pub enum JsonRpcConnectorError {
#[error("Invalid URI: {0}")]
InvalidUriError(#[from] http::uri::InvalidUri),

/// Invalid URL Errors.
#[error("Invalid URL: {0}")]
InvalidUrlError(#[from] url::ParseError),

/// UTF-8 Conversion Errors.
#[error("UTF-8 Conversion Error")]
Utf8Error(#[from] std::string::FromUtf8Error),
Expand All @@ -46,7 +50,7 @@ impl JsonRpcConnectorError {
JsonRpcConnectorError::SerdeJsonError(_) => {
tonic::Status::invalid_argument(self.to_string())
}
JsonRpcConnectorError::HyperError(_) => tonic::Status::unavailable(self.to_string()),
JsonRpcConnectorError::ReqwestError(_) => tonic::Status::unavailable(self.to_string()),
JsonRpcConnectorError::HttpError(_) => tonic::Status::internal(self.to_string()),
_ => tonic::Status::internal(self.to_string()),
}
Expand Down
14 changes: 7 additions & 7 deletions zaino-serve/src/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl CompactTxStreamer for GrpcClient {
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await
.await?
.get_blockchain_info()
.await
.map_err(|e| e.to_grpc_status())?;
Expand Down Expand Up @@ -320,7 +320,7 @@ impl CompactTxStreamer for GrpcClient {
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await
.await?
.get_raw_transaction(hash_hex, Some(1))
.await
.map_err(|e| e.to_grpc_status())?;
Expand Down Expand Up @@ -372,7 +372,7 @@ impl CompactTxStreamer for GrpcClient {
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await
.await?
.send_raw_transaction(hex_tx)
.await
.map_err(|e| e.to_grpc_status())?;
Expand Down Expand Up @@ -428,7 +428,7 @@ impl CompactTxStreamer for GrpcClient {
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await;
.await?;
let txids = zebrad_client
.get_address_txids(vec![address], start, end)
.await
Expand Down Expand Up @@ -615,7 +615,7 @@ impl CompactTxStreamer for GrpcClient {
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await;
.await?;

let zebrad_uri = self.zebrad_uri.clone();
let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(32);
Expand Down Expand Up @@ -747,7 +747,7 @@ impl CompactTxStreamer for GrpcClient {
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await;
.await?;

// TODO: This is slow. Chain, along with other blockchain info should be saved on startup and used here [blockcache?].
let chain = zebrad_client
Expand Down Expand Up @@ -907,7 +907,7 @@ impl CompactTxStreamer for GrpcClient {
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await;
.await?;

let zebra_info = zebrad_client
.get_info()
Expand Down

0 comments on commit 666808a

Please sign in to comment.