Skip to content

Commit

Permalink
all tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
idky137 committed Jun 14, 2024
1 parent f2d5234 commit 84ef335
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 112 deletions.
110 changes: 68 additions & 42 deletions zingo-rpc/src/jsonrpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,63 +169,89 @@ impl JsonRpcConnector {
&self.uri
}

/// Sends a jsonRPC request and returns the response.``
/// Sends a jsonRPC request and returns the response.
///
/// NOTE/TODO: This function currently resends the call up to 5 times on a server response of "Work queue depth exceeded".
/// This is because the node's queue can become overloaded and stop servicing RPCs.
/// This functionality is weak and should be incorporated in Zingo-Proxy's queue mechanism [WIP] that handles various errors appropriately.
pub async fn send_request<T: Serialize, R: for<'de> Deserialize<'de>>(
&self,
method: &str,
params: T,
) -> Result<R, JsonRpcConnectorError> {
let id = self.id_counter.fetch_add(1, Ordering::SeqCst);
let client = Client::builder().build(HttpsConnector::new());
let mut request_builder = Request::builder()
.method("POST")
.uri(self.uri.clone())
.header("Content-Type", "application/json");
if let (Some(user), Some(password)) = (&self.user, &self.password) {
let auth = base64::encode(format!("{}:{}", user, password));
request_builder = request_builder.header("Authorization", format!("Basic {}", auth));
}
let req = RpcRequest {
jsonrpc: "2.0".to_string(),
method: method.to_string(),
params,
id,
};
let request_body = serde_json::to_string(&req).map_err(|e| {
JsonRpcConnectorError::new_with_source("Failed to serialize request", Box::new(e))
})?;
// println!("request body`: {:?}", request_body);
let request = request_builder
.body(Body::from(request_body))
.map_err(|e| {
JsonRpcConnectorError::new_with_source("Failed to build request", Box::new(e))
let max_attempts = 5;
let mut attempts = 0;
loop {
attempts += 1;
let client = Client::builder().build(HttpsConnector::new());
let mut request_builder = Request::builder()
.method("POST")
.uri(self.uri.clone())
.header("Content-Type", "application/json");
if let (Some(user), Some(password)) = (&self.user, &self.password) {
let auth = base64::encode(format!("{}:{}", user, password));
request_builder =
request_builder.header("Authorization", format!("Basic {}", auth));
}
let request_body = serde_json::to_string(&req).map_err(|e| {
JsonRpcConnectorError::new_with_source("Failed to serialize request", Box::new(e))
})?;
// println!("request: {:?}", request);
let response = client.request(request).await.map_err(|e| {
JsonRpcConnectorError::new_with_source("HTTP request failed", Box::new(e))
})?;
let body_bytes = hyper::body::to_bytes(response.into_body())
.await
.map_err(|e| {
JsonRpcConnectorError::new_with_source("Failed to read response body", Box::new(e))
let request = request_builder
.body(Body::from(request_body))
.map_err(|e| {
JsonRpcConnectorError::new_with_source("Failed to build request", Box::new(e))
})?;
let response = client.request(request).await.map_err(|e| {
JsonRpcConnectorError::new_with_source("HTTP request failed", Box::new(e))
})?;
let body_bytes = hyper::body::to_bytes(response.into_body())
.await
.map_err(|e| {
JsonRpcConnectorError::new_with_source(
"Failed to read response body",
Box::new(e),
)
})?;

// let test_response: RpcResponse<R> =
// serde_json::from_slice(&body_bytes).unwrap_or_else(|e| {
// panic!(
// "Failed to deserialize response: {}\nBody bytes: {:?}",
// e,
// String::from_utf8_lossy(&body_bytes)
// )
// });
let body_str = String::from_utf8_lossy(&body_bytes);
if body_str.contains("Work queue depth exceeded") {
if attempts >= max_attempts {
return Err(JsonRpcConnectorError::new(
"Work queue depth exceeded after multiple attempts",
));
}
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}

// NOTE: This is useful for development but is not clear to users and should be simplified or completely removed before production.
// println!(
// "@zingoproxyd: Received response from {} call to node: {:#?}",
// method.to_string(),
// body_bytes
// );

let response: RpcResponse<R> = serde_json::from_slice(&body_bytes).map_err(|e| {
JsonRpcConnectorError::new_with_source("Failed to deserialize response", Box::new(e))
})?;
match response.error {
Some(error) => Err(JsonRpcConnectorError::new(format!(
"RPC Error {}: {}",
error.code, error.message
))),
None => Ok(response.result),
let response: RpcResponse<R> = serde_json::from_slice(&body_bytes).map_err(|e| {
JsonRpcConnectorError::new_with_source(
"Failed to deserialize response",
Box::new(e),
)
})?;
return match response.error {
Some(error) => Err(JsonRpcConnectorError::new(format!(
"RPC Error {}: {}",
error.code, error.message
))),
None => Ok(response.result),
};
}
}

Expand Down
140 changes: 70 additions & 70 deletions zingo-rpc/src/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
use hex::FromHex;
use tokio_stream::wrappers::ReceiverStream;
use zcash_client_backend::proto::{
compact_formats::{ChainMetadata, CompactBlock, CompactTx},
compact_formats::{CompactBlock, CompactTx},
service::{
compact_tx_streamer_server::CompactTxStreamer, Address, Balance, BlockId, BlockRange,
Empty, GetAddressUtxosReply, LightdInfo, RawTransaction, SubtreeRoot,
compact_tx_streamer_server::CompactTxStreamer, Address, Balance, BlockId, Empty,
GetAddressUtxosReply, LightdInfo, RawTransaction, SubtreeRoot,
},
};
use zebra_chain::block::Height;
Expand All @@ -16,7 +16,7 @@ use crate::{
define_grpc_passthrough,
jsonrpc::{
connector::JsonRpcConnector,
primitives::{GetBlockResponse, GetTransactionResponse, ProxyConsensusBranchIdHex},
primitives::{GetTransactionResponse, ProxyConsensusBranchIdHex},
},
primitives::ProxyClient,
utils::get_build_info,
Expand Down Expand Up @@ -668,72 +668,72 @@ impl CompactTxStreamer for ProxyClient {
) -> Self::GetMempoolStreamStream
);

// /// GetTreeState returns the note commitment tree state corresponding to the given block.
// /// See section 3.7 of the Zcash protocol specification. It returns several other useful
// /// values also (even though they can be obtained using GetBlock).
// /// The block can be specified by either height or hash.
// fn get_tree_state<'life0, 'async_trait>(
// &'life0 self,
// request: tonic::Request<zcash_client_backend::proto::service::BlockId>,
// ) -> core::pin::Pin<
// Box<
// dyn core::future::Future<
// Output = std::result::Result<
// tonic::Response<zcash_client_backend::proto::service::TreeState>,
// tonic::Status,
// >,
// > + core::marker::Send
// + 'async_trait,
// >,
// >
// where
// 'life0: 'async_trait,
// Self: 'async_trait,
// {
// println!("@zingoproxyd: Received call of get_tree_state.");
// Box::pin(async {
// let block_id = request.into_inner();
// let hash_or_height = if block_id.height != 0 {
// block_id.height.to_string()
// } else {
// hex::encode(block_id.hash)
// };

// let zebrad_client = JsonRpcConnector::new(
// self.zebrad_uri.clone(),
// Some("xxxxxx".to_string()),
// Some("xxxxxx".to_string()),
// )
// .await;

// // TODO: This is slow. Chain, along with other blockchain info should be saved on startup and used here [blockcache?].
// let chain = zebrad_client
// .get_blockchain_info()
// .await
// .map_err(|e| e.to_grpc_status())?
// .chain;
// let treestate = zebrad_client
// .get_treestate(hash_or_height)
// .await
// .map_err(|e| e.to_grpc_status())?;
// Ok(tonic::Response::new(
// zcash_client_backend::proto::service::TreeState {
// network: chain,
// height: treestate.height as u64,
// hash: treestate.hash.to_string(),
// time: treestate.time,
// sapling_tree: treestate.sapling.commitments.final_state.to_string(),
// orchard_tree: treestate.orchard.commitments.final_state.to_string(),
// },
// ))
// })
// }
define_grpc_passthrough!(
fn get_tree_state(
&self,
request: tonic::Request<BlockId>,
) -> zcash_client_backend::proto::service::TreeState
);
/// GetTreeState returns the note commitment tree state corresponding to the given block.
/// See section 3.7 of the Zcash protocol specification. It returns several other useful
/// values also (even though they can be obtained using GetBlock).
/// The block can be specified by either height or hash.
fn get_tree_state<'life0, 'async_trait>(
&'life0 self,
request: tonic::Request<zcash_client_backend::proto::service::BlockId>,
) -> core::pin::Pin<
Box<
dyn core::future::Future<
Output = std::result::Result<
tonic::Response<zcash_client_backend::proto::service::TreeState>,
tonic::Status,
>,
> + core::marker::Send
+ 'async_trait,
>,
>
where
'life0: 'async_trait,
Self: 'async_trait,
{
println!("@zingoproxyd: Received call of get_tree_state.");
Box::pin(async {
let block_id = request.into_inner();
let hash_or_height = if block_id.height != 0 {
block_id.height.to_string()
} else {
hex::encode(block_id.hash)
};

let zebrad_client = JsonRpcConnector::new(
self.zebrad_uri.clone(),
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await;

// TODO: This is slow. Chain, along with other blockchain info should be saved on startup and used here [blockcache?].
let chain = zebrad_client
.get_blockchain_info()
.await
.map_err(|e| e.to_grpc_status())?
.chain;
let treestate = zebrad_client
.get_treestate(hash_or_height)
.await
.map_err(|e| e.to_grpc_status())?;
Ok(tonic::Response::new(
zcash_client_backend::proto::service::TreeState {
network: chain,
height: treestate.height as u64,
hash: treestate.hash.to_string(),
time: treestate.time,
sapling_tree: treestate.sapling.commitments.final_state.to_string(),
orchard_tree: treestate.orchard.commitments.final_state.to_string(),
},
))
})
}
// define_grpc_passthrough!(
// fn get_tree_state(
// &self,
// request: tonic::Request<BlockId>,
// ) -> zcash_client_backend::proto::service::TreeState
// );

/// This RPC has not been implemented as it is not currently used by zingolib.
/// If you require this RPC please open an issue or PR at the Zingo-Proxy github (https://github.com/zingolabs/zingo-proxy).
Expand Down

0 comments on commit 84ef335

Please sign in to comment.