Skip to content

Commit

Permalink
added 30s timeout to streaming rpcs
Browse files Browse the repository at this point in the history
  • Loading branch information
idky137 committed Jun 18, 2024
1 parent 917072e commit 5460808
Showing 1 changed file with 137 additions and 104 deletions.
241 changes: 137 additions & 104 deletions zingo-rpc/src/rpc/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Lightwallet service RPC implementations.

use hex::FromHex;
use tokio::time::timeout;
use tokio_stream::wrappers::ReceiverStream;
use zcash_client_backend::proto::{
compact_formats::{CompactBlock, CompactTx},
Expand Down Expand Up @@ -190,7 +191,6 @@ impl CompactTxStreamer for ProxyClient {
///
/// TODO: This implementation is slow. An internal block cache should be implemented that this rpc, along with the get_block rpc, can rely on.
/// - add get_block function that queries the block cache for block and calls get_block_from_node to fetch block if not present.
/// TODO: Add 30s timeout.
fn get_block_range<'life0, 'async_trait>(
&'life0 self,
request: tonic::Request<zcash_client_backend::proto::service::BlockRange>,
Expand Down Expand Up @@ -226,24 +226,36 @@ impl CompactTxStreamer for ProxyClient {
}
let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
for height in (start..=end).rev() {
let compact_block = get_block_from_node(&zebrad_uri, &height).await;
match compact_block {
Ok(block) => {
if channel_tx.send(Ok(block)).await.is_err() {
break;
let timeout = timeout(std::time::Duration::from_secs(30), async {
for height in (start..=end).rev() {
let compact_block = get_block_from_node(&zebrad_uri, &height).await;
match compact_block {
Ok(block) => {
if channel_tx.send(Ok(block)).await.is_err() {
break;
}
}
}
Err(e) => {
if channel_tx
.send(Err(tonic::Status::internal(e.to_string())))
.await
.is_err()
{
break;
Err(e) => {
if channel_tx
.send(Err(tonic::Status::internal(e.to_string())))
.await
.is_err()
{
break;
}
}
}
}
})
.await;
match timeout {
Ok(_) => {}
Err(_) => {
channel_tx
.send(Err(tonic::Status::internal("Request timed out")))
.await
.ok();
}
}
});
let output_stream = CompactBlockStream::new(channel_rx);
Expand Down Expand Up @@ -390,8 +402,6 @@ impl CompactTxStreamer for ProxyClient {
type GetTaddressTxidsStream = std::pin::Pin<Box<RawTransactionStream>>;

/// This name is misleading, returns the full transactions that have either inputs or outputs connected to the given transparent address.
///
/// TODO: Add 30 second timout.
fn get_taddress_txids<'life0, 'async_trait>(
&'life0 self,
request: tonic::Request<
Expand Down Expand Up @@ -439,25 +449,26 @@ impl CompactTxStreamer for ProxyClient {
.await
.map_err(|e| e.to_grpc_status())?;

let (tx, rx) = tokio::sync::mpsc::channel(32);
let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
for txid in txids.transactions {
let transaction = zebrad_client.get_raw_transaction(txid, Some(1)).await;
match transaction {
Ok(GetTransactionResponse::Object { hex, height, .. }) => {
if tx
.send(Ok(RawTransaction {
data: hex.bytes,
height: height as u64,
}))
.await
.is_err()
{
break;
let timeout = timeout(std::time::Duration::from_secs(30), async {
for txid in txids.transactions {
let transaction = zebrad_client.get_raw_transaction(txid, Some(1)).await;
match transaction {
Ok(GetTransactionResponse::Object { hex, height, .. }) => {
if channel_tx
.send(Ok(RawTransaction {
data: hex.bytes,
height: height as u64,
}))
.await
.is_err()
{
break;
}
}
}
Ok(GetTransactionResponse::Raw(_)) => {
if tx
Ok(GetTransactionResponse::Raw(_)) => {
if channel_tx
.send(Err(tonic::Status::internal(
"Received raw transaction type, this should not be impossible.",
)))
Expand All @@ -466,20 +477,31 @@ impl CompactTxStreamer for ProxyClient {
{
break;
}
}
Err(e) => {
if tx
.send(Err(tonic::Status::internal(e.to_string())))
.await
.is_err()
{
break;
}
Err(e) => {
if channel_tx
.send(Err(tonic::Status::internal(e.to_string())))
.await
.is_err()
{
break;
}
}
}
}
})
.await;
match timeout {
Ok(_) => {}
Err(_) => {
channel_tx
.send(Err(tonic::Status::internal("Request timed out")))
.await
.ok();
}
}
});
let output_stream = RawTransactionStream::new(rx);
let output_stream = RawTransactionStream::new(channel_rx);
let stream_boxed = Box::pin(output_stream);
Ok(tonic::Response::new(stream_boxed))
})
Expand Down Expand Up @@ -584,7 +606,6 @@ impl CompactTxStreamer for ProxyClient {
///
/// TODO: This implementation is slow. Zingo-Proxy's blockcache state engine should keep its own internal mempool state.
/// - This RPC should query Zingo-Proxy's internal mempool state rather than creating its own mempool and directly querying zebrad.
/// TODO: Add 30s timeout.
fn get_mempool_stream<'life0, 'async_trait>(
&'life0 self,
_request: tonic::Request<Empty>,
Expand Down Expand Up @@ -613,83 +634,95 @@ impl CompactTxStreamer for ProxyClient {
.await;

let zebrad_uri = self.zebrad_uri.clone();
let (tx, rx) = tokio::sync::mpsc::channel(32);
let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
let mempool = Mempool::new();
if let Err(e) = mempool.update(&zebrad_uri).await {
tx.send(Err(tonic::Status::internal(e.to_string())))
.await
.ok();
return;
}
let mut mined = false;
let mut txid_index: usize = 0;
while !mined {
match mempool.get_mempool_txids().await {
Ok(mempool_txids) => {
for txid in &mempool_txids[txid_index..] {
match zebrad_client
.get_raw_transaction(txid.clone(), Some(1))
.await {
Ok(GetTransactionResponse::Object { hex, height, .. }) => {
txid_index += 1;
if tx
.send(Ok(RawTransaction {
data: hex.bytes,
height: height as u64,
}))
let timeout = timeout(std::time::Duration::from_secs(30), async {
let mempool = Mempool::new();
if let Err(e) = mempool.update(&zebrad_uri).await {
channel_tx.send(Err(tonic::Status::internal(e.to_string())))
.await
.ok();
return;
}
let mut mined = false;
let mut txid_index: usize = 0;
while !mined {
match mempool.get_mempool_txids().await {
Ok(mempool_txids) => {
for txid in &mempool_txids[txid_index..] {
match zebrad_client
.get_raw_transaction(txid.clone(), Some(1))
.await {
Ok(GetTransactionResponse::Object { hex, height, .. }) => {
txid_index += 1;
if channel_tx
.send(Ok(RawTransaction {
data: hex.bytes,
height: height as u64,
}))
.await
.is_err()
{
break;
}
}
Ok(GetTransactionResponse::Raw(_)) => {
if channel_tx
.send(Err(tonic::Status::internal(
"Received raw transaction type, this should not be impossible.",
)))
.await
.is_err()
{
break;
}
}
Ok(GetTransactionResponse::Raw(_)) => {
if tx
.send(Err(tonic::Status::internal(
"Received raw transaction type, this should not be impossible.",
)))
.await
.is_err()
{
}
Err(e) => {
if channel_tx
.send(Err(tonic::Status::internal(e.to_string())))
.await
.is_err()
{
break;
}
}
Err(e) => {
if tx
.send(Err(tonic::Status::internal(e.to_string())))
.await
.is_err()
{
break;
}
}
}
}
}
Err(e) => {
if channel_tx
.send(Err(tonic::Status::internal(e.to_string())))
.await
.is_err()
{
break;
}
}
}
Err(e) => {
if tx
.send(Err(tonic::Status::internal(e.to_string())))
.await
.is_err()
{
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
mined = match mempool.update(&zebrad_uri).await {
Ok(mined) => mined,
Err(e) => {
channel_tx.send(Err(tonic::Status::internal(e.to_string())))
.await
.ok();
break;
}
}
};
}
})
.await;
match timeout {
Ok(_) => {}
Err(_) => {
channel_tx
.send(Err(tonic::Status::internal("Request timed out")))
.await
.ok();
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
mined = match mempool.update(&zebrad_uri).await {
Ok(mined) => mined,
Err(e) => {
tx.send(Err(tonic::Status::internal(e.to_string())))
.await
.ok();
break;
}
};
}
});
let output_stream = RawTransactionStream::new(rx);
let output_stream = RawTransactionStream::new(channel_rx);
let stream_boxed = Box::pin(output_stream);
Ok(tonic::Response::new(stream_boxed))
})
Expand Down

0 comments on commit 5460808

Please sign in to comment.