diff --git a/zingo-rpc/src/jsonrpc/connector.rs b/zingo-rpc/src/jsonrpc/connector.rs index 750a60c..48ff6f0 100644 --- a/zingo-rpc/src/jsonrpc/connector.rs +++ b/zingo-rpc/src/jsonrpc/connector.rs @@ -169,63 +169,89 @@ impl JsonRpcConnector { &self.uri } - /// Sends a jsonRPC request and returns the response.`` + /// Sends a jsonRPC request and returns the response. + /// + /// NOTE/TODO: This function currently resends the call up to 5 times on a server response of "Work queue depth exceeded". + /// This is because the node's queue can become overloaded and stop servicing RPCs. + /// This functionality is weak and should be incorporated in Zingo-Proxy's queue mechanism [WIP] that handles various errors appropriately. pub async fn send_request Deserialize<'de>>( &self, method: &str, params: T, ) -> Result { let id = self.id_counter.fetch_add(1, Ordering::SeqCst); - let client = Client::builder().build(HttpsConnector::new()); - let mut request_builder = Request::builder() - .method("POST") - .uri(self.uri.clone()) - .header("Content-Type", "application/json"); - if let (Some(user), Some(password)) = (&self.user, &self.password) { - let auth = base64::encode(format!("{}:{}", user, password)); - request_builder = request_builder.header("Authorization", format!("Basic {}", auth)); - } let req = RpcRequest { jsonrpc: "2.0".to_string(), method: method.to_string(), params, id, }; - let request_body = serde_json::to_string(&req).map_err(|e| { - JsonRpcConnectorError::new_with_source("Failed to serialize request", Box::new(e)) - })?; - // println!("request body`: {:?}", request_body); - let request = request_builder - .body(Body::from(request_body)) - .map_err(|e| { - JsonRpcConnectorError::new_with_source("Failed to build request", Box::new(e)) + let max_attempts = 5; + let mut attempts = 0; + loop { + attempts += 1; + let client = Client::builder().build(HttpsConnector::new()); + let mut request_builder = Request::builder() + .method("POST") + .uri(self.uri.clone()) + .header("Content-Type", "application/json"); + if let (Some(user), Some(password)) = (&self.user, &self.password) { + let auth = base64::encode(format!("{}:{}", user, password)); + request_builder = + request_builder.header("Authorization", format!("Basic {}", auth)); + } + let request_body = serde_json::to_string(&req).map_err(|e| { + JsonRpcConnectorError::new_with_source("Failed to serialize request", Box::new(e)) })?; - // println!("request: {:?}", request); - let response = client.request(request).await.map_err(|e| { - JsonRpcConnectorError::new_with_source("HTTP request failed", Box::new(e)) - })?; - let body_bytes = hyper::body::to_bytes(response.into_body()) - .await - .map_err(|e| { - JsonRpcConnectorError::new_with_source("Failed to read response body", Box::new(e)) + let request = request_builder + .body(Body::from(request_body)) + .map_err(|e| { + JsonRpcConnectorError::new_with_source("Failed to build request", Box::new(e)) + })?; + let response = client.request(request).await.map_err(|e| { + JsonRpcConnectorError::new_with_source("HTTP request failed", Box::new(e)) })?; + let body_bytes = hyper::body::to_bytes(response.into_body()) + .await + .map_err(|e| { + JsonRpcConnectorError::new_with_source( + "Failed to read response body", + Box::new(e), + ) + })?; + + // let test_response: RpcResponse = + // serde_json::from_slice(&body_bytes).unwrap_or_else(|e| { + // panic!( + // "Failed to deserialize response: {}\nBody bytes: {:?}", + // e, + // String::from_utf8_lossy(&body_bytes) + // ) + // }); + let body_str = String::from_utf8_lossy(&body_bytes); + if body_str.contains("Work queue depth exceeded") { + if attempts >= max_attempts { + return Err(JsonRpcConnectorError::new( + "Work queue depth exceeded after multiple attempts", + )); + } + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + continue; + } - // NOTE: This is useful for development but is not clear to users and should be simplified or completely removed before production. - // println!( - // "@zingoproxyd: Received response from {} call to node: {:#?}", - // method.to_string(), - // body_bytes - // ); - - let response: RpcResponse = serde_json::from_slice(&body_bytes).map_err(|e| { - JsonRpcConnectorError::new_with_source("Failed to deserialize response", Box::new(e)) - })?; - match response.error { - Some(error) => Err(JsonRpcConnectorError::new(format!( - "RPC Error {}: {}", - error.code, error.message - ))), - None => Ok(response.result), + let response: RpcResponse = serde_json::from_slice(&body_bytes).map_err(|e| { + JsonRpcConnectorError::new_with_source( + "Failed to deserialize response", + Box::new(e), + ) + })?; + return match response.error { + Some(error) => Err(JsonRpcConnectorError::new(format!( + "RPC Error {}: {}", + error.code, error.message + ))), + None => Ok(response.result), + }; } } diff --git a/zingo-rpc/src/rpc/service.rs b/zingo-rpc/src/rpc/service.rs index 79b93ca..056aa56 100644 --- a/zingo-rpc/src/rpc/service.rs +++ b/zingo-rpc/src/rpc/service.rs @@ -3,10 +3,10 @@ use hex::FromHex; use tokio_stream::wrappers::ReceiverStream; use zcash_client_backend::proto::{ - compact_formats::{ChainMetadata, CompactBlock, CompactTx}, + compact_formats::{CompactBlock, CompactTx}, service::{ - compact_tx_streamer_server::CompactTxStreamer, Address, Balance, BlockId, BlockRange, - Empty, GetAddressUtxosReply, LightdInfo, RawTransaction, SubtreeRoot, + compact_tx_streamer_server::CompactTxStreamer, Address, Balance, BlockId, Empty, + GetAddressUtxosReply, LightdInfo, RawTransaction, SubtreeRoot, }, }; use zebra_chain::block::Height; @@ -16,7 +16,7 @@ use crate::{ define_grpc_passthrough, jsonrpc::{ connector::JsonRpcConnector, - primitives::{GetBlockResponse, GetTransactionResponse, ProxyConsensusBranchIdHex}, + primitives::{GetTransactionResponse, ProxyConsensusBranchIdHex}, }, primitives::ProxyClient, utils::get_build_info, @@ -668,72 +668,72 @@ impl CompactTxStreamer for ProxyClient { ) -> Self::GetMempoolStreamStream ); - // /// GetTreeState returns the note commitment tree state corresponding to the given block. - // /// See section 3.7 of the Zcash protocol specification. It returns several other useful - // /// values also (even though they can be obtained using GetBlock). - // /// The block can be specified by either height or hash. - // fn get_tree_state<'life0, 'async_trait>( - // &'life0 self, - // request: tonic::Request, - // ) -> core::pin::Pin< - // Box< - // dyn core::future::Future< - // Output = std::result::Result< - // tonic::Response, - // tonic::Status, - // >, - // > + core::marker::Send - // + 'async_trait, - // >, - // > - // where - // 'life0: 'async_trait, - // Self: 'async_trait, - // { - // println!("@zingoproxyd: Received call of get_tree_state."); - // Box::pin(async { - // let block_id = request.into_inner(); - // let hash_or_height = if block_id.height != 0 { - // block_id.height.to_string() - // } else { - // hex::encode(block_id.hash) - // }; - - // let zebrad_client = JsonRpcConnector::new( - // self.zebrad_uri.clone(), - // Some("xxxxxx".to_string()), - // Some("xxxxxx".to_string()), - // ) - // .await; - - // // TODO: This is slow. Chain, along with other blockchain info should be saved on startup and used here [blockcache?]. - // let chain = zebrad_client - // .get_blockchain_info() - // .await - // .map_err(|e| e.to_grpc_status())? - // .chain; - // let treestate = zebrad_client - // .get_treestate(hash_or_height) - // .await - // .map_err(|e| e.to_grpc_status())?; - // Ok(tonic::Response::new( - // zcash_client_backend::proto::service::TreeState { - // network: chain, - // height: treestate.height as u64, - // hash: treestate.hash.to_string(), - // time: treestate.time, - // sapling_tree: treestate.sapling.commitments.final_state.to_string(), - // orchard_tree: treestate.orchard.commitments.final_state.to_string(), - // }, - // )) - // }) - // } - define_grpc_passthrough!( - fn get_tree_state( - &self, - request: tonic::Request, - ) -> zcash_client_backend::proto::service::TreeState - ); + /// GetTreeState returns the note commitment tree state corresponding to the given block. + /// See section 3.7 of the Zcash protocol specification. It returns several other useful + /// values also (even though they can be obtained using GetBlock). + /// The block can be specified by either height or hash. + fn get_tree_state<'life0, 'async_trait>( + &'life0 self, + request: tonic::Request, + ) -> core::pin::Pin< + Box< + dyn core::future::Future< + Output = std::result::Result< + tonic::Response, + tonic::Status, + >, + > + core::marker::Send + + 'async_trait, + >, + > + where + 'life0: 'async_trait, + Self: 'async_trait, + { + println!("@zingoproxyd: Received call of get_tree_state."); + Box::pin(async { + let block_id = request.into_inner(); + let hash_or_height = if block_id.height != 0 { + block_id.height.to_string() + } else { + hex::encode(block_id.hash) + }; + + let zebrad_client = JsonRpcConnector::new( + self.zebrad_uri.clone(), + Some("xxxxxx".to_string()), + Some("xxxxxx".to_string()), + ) + .await; + + // TODO: This is slow. Chain, along with other blockchain info should be saved on startup and used here [blockcache?]. + let chain = zebrad_client + .get_blockchain_info() + .await + .map_err(|e| e.to_grpc_status())? + .chain; + let treestate = zebrad_client + .get_treestate(hash_or_height) + .await + .map_err(|e| e.to_grpc_status())?; + Ok(tonic::Response::new( + zcash_client_backend::proto::service::TreeState { + network: chain, + height: treestate.height as u64, + hash: treestate.hash.to_string(), + time: treestate.time, + sapling_tree: treestate.sapling.commitments.final_state.to_string(), + orchard_tree: treestate.orchard.commitments.final_state.to_string(), + }, + )) + }) + } + // define_grpc_passthrough!( + // fn get_tree_state( + // &self, + // request: tonic::Request, + // ) -> zcash_client_backend::proto::service::TreeState + // ); /// This RPC has not been implemented as it is not currently used by zingolib. /// If you require this RPC please open an issue or PR at the Zingo-Proxy github (https://github.com/zingolabs/zingo-proxy).