Skip to content

Commit

Permalink
implemented get_block_range_nullifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
idky137 committed Oct 30, 2024
1 parent 18ba6aa commit d28da9d
Showing 1 changed file with 57 additions and 6 deletions.
63 changes: 57 additions & 6 deletions zaino-serve/src/rpc/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,14 @@ impl CompactTxStreamer for GrpcClient {

/// Server streaming response type for the GetBlockRangeNullifiers method.
#[doc = " Server streaming response type for the GetBlockRangeNullifiers method."]
type GetBlockRangeNullifiersStream = tonic::Streaming<CompactBlock>;
type GetBlockRangeNullifiersStream = std::pin::Pin<Box<CompactBlockStream>>;

/// Same as GetBlockRange except actions contain only nullifiers.
///
/// This RPC has not been implemented as it is not currently used by zingolib.
/// If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer).
/// NOTE: This should be reimplemented with the introduction of the BlockCache.
fn get_block_range_nullifiers<'life0, 'async_trait>(
&'life0 self,
_request: tonic::Request<BlockRange>,
request: tonic::Request<BlockRange>,
) -> core::pin::Pin<
Box<
dyn core::future::Future<
Expand All @@ -298,8 +297,60 @@ impl CompactTxStreamer for GrpcClient {
Self: 'async_trait,
{
println!("[TEST] Received call of get_block_range_nullifiers.");
Box::pin(async {
Err(tonic::Status::unimplemented("get_block_range_nullifiers not yet implemented. If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer)."))
let zebrad_uri = self.zebrad_uri.clone();
Box::pin(async move {
let blockrange = request.into_inner();
let mut start = blockrange
.start
.map(|s| s.height as u32)
.ok_or(tonic::Status::invalid_argument("Start block not specified"))?;
let mut end = blockrange
.end
.map(|e| e.height as u32)
.ok_or(tonic::Status::invalid_argument("End block not specified"))?;
if start > end {
(start, end) = (end, start);
}
let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(32);
tokio::spawn(async move {
// NOTE: This timeout is so slow due to the blockcache not being implemented. This should be reduced to 30s once functionality is in place.
let timeout = timeout(std::time::Duration::from_secs(120), async {
for height in start..=end {
let compact_block = get_nullifiers_from_node(&zebrad_uri, &height).await;
match compact_block {
Ok(block) => {
if channel_tx.send(Ok(block)).await.is_err() {
break;
}
}
Err(e) => {
if channel_tx
.send(Err(tonic::Status::internal(e.to_string())))
.await
.is_err()
{
break;
}
}
}
}
})
.await;
match timeout {
Ok(_) => {}
Err(_) => {
channel_tx
.send(Err(tonic::Status::internal(
"get_block_range_nullifiers gRPC request timed out",
)))
.await
.ok();
}
}
});
let output_stream = CompactBlockStream::new(channel_rx);
let stream_boxed = Box::pin(output_stream);
Ok(tonic::Response::new(stream_boxed))
})
}

Expand Down

0 comments on commit d28da9d

Please sign in to comment.