diff --git a/zingo-rpc/src/rpc/service.rs b/zingo-rpc/src/rpc/service.rs index 6a502ee..ae99841 100644 --- a/zingo-rpc/src/rpc/service.rs +++ b/zingo-rpc/src/rpc/service.rs @@ -1,6 +1,7 @@ //! Lightwallet service RPC implementations. use hex::FromHex; +use tokio::time::timeout; use tokio_stream::wrappers::ReceiverStream; use zcash_client_backend::proto::{ compact_formats::{CompactBlock, CompactTx}, @@ -190,7 +191,6 @@ impl CompactTxStreamer for ProxyClient { /// /// TODO: This implementation is slow. An internal block cache should be implemented that this rpc, along with the get_block rpc, can rely on. /// - add get_block function that queries the block cache for block and calls get_block_from_node to fetch block if not present. - /// TODO: Add 30s timeout. fn get_block_range<'life0, 'async_trait>( &'life0 self, request: tonic::Request, @@ -226,24 +226,36 @@ impl CompactTxStreamer for ProxyClient { } let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(32); tokio::spawn(async move { - for height in (start..=end).rev() { - let compact_block = get_block_from_node(&zebrad_uri, &height).await; - match compact_block { - Ok(block) => { - if channel_tx.send(Ok(block)).await.is_err() { - break; + let timeout = timeout(std::time::Duration::from_secs(30), async { + for height in (start..=end).rev() { + let compact_block = get_block_from_node(&zebrad_uri, &height).await; + match compact_block { + Ok(block) => { + if channel_tx.send(Ok(block)).await.is_err() { + break; + } } - } - Err(e) => { - if channel_tx - .send(Err(tonic::Status::internal(e.to_string()))) - .await - .is_err() - { - break; + Err(e) => { + if channel_tx + .send(Err(tonic::Status::internal(e.to_string()))) + .await + .is_err() + { + break; + } } } } + }) + .await; + match timeout { + Ok(_) => {} + Err(_) => { + channel_tx + .send(Err(tonic::Status::internal("Request timed out"))) + .await + .ok(); + } } }); let output_stream = CompactBlockStream::new(channel_rx); @@ -390,8 +402,6 @@ impl CompactTxStreamer for ProxyClient { type GetTaddressTxidsStream = std::pin::Pin>; /// This name is misleading, returns the full transactions that have either inputs or outputs connected to the given transparent address. - /// - /// TODO: Add 30 second timout. fn get_taddress_txids<'life0, 'async_trait>( &'life0 self, request: tonic::Request< @@ -439,25 +449,26 @@ impl CompactTxStreamer for ProxyClient { .await .map_err(|e| e.to_grpc_status())?; - let (tx, rx) = tokio::sync::mpsc::channel(32); + let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(32); tokio::spawn(async move { - for txid in txids.transactions { - let transaction = zebrad_client.get_raw_transaction(txid, Some(1)).await; - match transaction { - Ok(GetTransactionResponse::Object { hex, height, .. }) => { - if tx - .send(Ok(RawTransaction { - data: hex.bytes, - height: height as u64, - })) - .await - .is_err() - { - break; + let timeout = timeout(std::time::Duration::from_secs(30), async { + for txid in txids.transactions { + let transaction = zebrad_client.get_raw_transaction(txid, Some(1)).await; + match transaction { + Ok(GetTransactionResponse::Object { hex, height, .. }) => { + if channel_tx + .send(Ok(RawTransaction { + data: hex.bytes, + height: height as u64, + })) + .await + .is_err() + { + break; + } } - } - Ok(GetTransactionResponse::Raw(_)) => { - if tx + Ok(GetTransactionResponse::Raw(_)) => { + if channel_tx .send(Err(tonic::Status::internal( "Received raw transaction type, this should not be impossible.", ))) @@ -466,20 +477,31 @@ impl CompactTxStreamer for ProxyClient { { break; } - } - Err(e) => { - if tx - .send(Err(tonic::Status::internal(e.to_string()))) - .await - .is_err() - { - break; + } + Err(e) => { + if channel_tx + .send(Err(tonic::Status::internal(e.to_string()))) + .await + .is_err() + { + break; + } } } } + }) + .await; + match timeout { + Ok(_) => {} + Err(_) => { + channel_tx + .send(Err(tonic::Status::internal("Request timed out"))) + .await + .ok(); + } } }); - let output_stream = RawTransactionStream::new(rx); + let output_stream = RawTransactionStream::new(channel_rx); let stream_boxed = Box::pin(output_stream); Ok(tonic::Response::new(stream_boxed)) }) @@ -584,7 +606,6 @@ impl CompactTxStreamer for ProxyClient { /// /// TODO: This implementation is slow. Zingo-Proxy's blockcache state engine should keep its own internal mempool state. /// - This RPC should query Zingo-Proxy's internal mempool state rather than creating its own mempool and directly querying zebrad. - /// TODO: Add 30s timeout. fn get_mempool_stream<'life0, 'async_trait>( &'life0 self, _request: tonic::Request, @@ -613,83 +634,95 @@ impl CompactTxStreamer for ProxyClient { .await; let zebrad_uri = self.zebrad_uri.clone(); - let (tx, rx) = tokio::sync::mpsc::channel(32); + let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(32); tokio::spawn(async move { - let mempool = Mempool::new(); - if let Err(e) = mempool.update(&zebrad_uri).await { - tx.send(Err(tonic::Status::internal(e.to_string()))) - .await - .ok(); - return; - } - let mut mined = false; - let mut txid_index: usize = 0; - while !mined { - match mempool.get_mempool_txids().await { - Ok(mempool_txids) => { - for txid in &mempool_txids[txid_index..] { - match zebrad_client - .get_raw_transaction(txid.clone(), Some(1)) - .await { - Ok(GetTransactionResponse::Object { hex, height, .. }) => { - txid_index += 1; - if tx - .send(Ok(RawTransaction { - data: hex.bytes, - height: height as u64, - })) + let timeout = timeout(std::time::Duration::from_secs(30), async { + let mempool = Mempool::new(); + if let Err(e) = mempool.update(&zebrad_uri).await { + channel_tx.send(Err(tonic::Status::internal(e.to_string()))) + .await + .ok(); + return; + } + let mut mined = false; + let mut txid_index: usize = 0; + while !mined { + match mempool.get_mempool_txids().await { + Ok(mempool_txids) => { + for txid in &mempool_txids[txid_index..] { + match zebrad_client + .get_raw_transaction(txid.clone(), Some(1)) + .await { + Ok(GetTransactionResponse::Object { hex, height, .. }) => { + txid_index += 1; + if channel_tx + .send(Ok(RawTransaction { + data: hex.bytes, + height: height as u64, + })) + .await + .is_err() + { + break; + } + } + Ok(GetTransactionResponse::Raw(_)) => { + if channel_tx + .send(Err(tonic::Status::internal( + "Received raw transaction type, this should not be impossible.", + ))) .await .is_err() { break; } - } - Ok(GetTransactionResponse::Raw(_)) => { - if tx - .send(Err(tonic::Status::internal( - "Received raw transaction type, this should not be impossible.", - ))) - .await - .is_err() - { + } + Err(e) => { + if channel_tx + .send(Err(tonic::Status::internal(e.to_string()))) + .await + .is_err() + { break; - } - } - Err(e) => { - if tx - .send(Err(tonic::Status::internal(e.to_string()))) - .await - .is_err() - { - break; + } } } } } + Err(e) => { + if channel_tx + .send(Err(tonic::Status::internal(e.to_string()))) + .await + .is_err() + { + break; + } + } } - Err(e) => { - if tx - .send(Err(tonic::Status::internal(e.to_string()))) - .await - .is_err() - { + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + mined = match mempool.update(&zebrad_uri).await { + Ok(mined) => mined, + Err(e) => { + channel_tx.send(Err(tonic::Status::internal(e.to_string()))) + .await + .ok(); break; } - } + }; + } + }) + .await; + match timeout { + Ok(_) => {} + Err(_) => { + channel_tx + .send(Err(tonic::Status::internal("Request timed out"))) + .await + .ok(); } - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - mined = match mempool.update(&zebrad_uri).await { - Ok(mined) => mined, - Err(e) => { - tx.send(Err(tonic::Status::internal(e.to_string()))) - .await - .ok(); - break; - } - }; } }); - let output_stream = RawTransactionStream::new(rx); + let output_stream = RawTransactionStream::new(channel_rx); let stream_boxed = Box::pin(output_stream); Ok(tonic::Response::new(stream_boxed)) })