Skip to content
This repository has been archived by the owner on Nov 5, 2023. It is now read-only.

Commit

Permalink
feat(rpc-testing-utils) : Implement trace_filter Stream Extension for…
Browse files Browse the repository at this point in the history
… TraceApiExt (paradigmxyz#5282)
  • Loading branch information
DoTheBestToGetTheBest authored Nov 4, 2023
1 parent 219b7db commit bcd8e6f
Showing 1 changed file with 49 additions and 1 deletion.
50 changes: 49 additions & 1 deletion crates/rpc/rpc-testing-util/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use jsonrpsee::core::Error as RpcError;
use reth_primitives::{BlockId, Bytes, TxHash, B256};
use reth_rpc_api::clients::TraceApiClient;
use reth_rpc_types::{
trace::parity::{LocalizedTransactionTrace, TraceResults, TraceType},
trace::{
filter::TraceFilter,
parity::{LocalizedTransactionTrace, TraceResults, TraceType},
},
CallRequest, Index,
};
use std::{
Expand All @@ -31,6 +34,9 @@ pub type CallManyTraceResult = Result<
/// index.
pub type TraceGetResult =
Result<(Option<LocalizedTransactionTrace>, B256, Vec<Index>), (RpcError, B256, Vec<Index>)>;
/// Represents a result type for the `trace_filter` stream extension.
pub type TraceFilterResult =
Result<(Vec<LocalizedTransactionTrace>, TraceFilter), (RpcError, TraceFilter)>;

/// An extension trait for the Trace API.
#[async_trait::async_trait]
Expand Down Expand Up @@ -86,6 +92,33 @@ pub trait TraceApiExt {
fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
where
I: IntoIterator<Item = Index>;

/// Returns a new stream that yields traces for given filters.
fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
where
I: IntoIterator<Item = TraceFilter>;
}

/// Represents a stream that asynchronously yields the results of the `trace_filter` method.
#[must_use = "streams do nothing unless polled"]
pub struct TraceFilterStream<'a> {
stream: Pin<Box<dyn Stream<Item = TraceFilterResult> + 'a>>,
}

impl<'a> Stream for TraceFilterStream<'a> {
type Item = TraceFilterResult;

/// Attempts to pull out the next value of the stream.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}

impl<'a> std::fmt::Debug for TraceFilterStream<'a> {
/// Provides a debug representation of the `TraceFilterStream`.
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceFilterStream").finish_non_exhaustive()
}
}
/// A stream that asynchronously yields the results of the `trace_get` method for a given
/// transaction hash and a series of indices.
Expand Down Expand Up @@ -274,6 +307,21 @@ impl<T: TraceApiClient + Sync> TraceApiExt for T {
.buffered(10);
TraceGetStream { stream: Box::pin(stream) }
}

fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
where
I: IntoIterator<Item = TraceFilter>,
{
let filter_list = filters.into_iter().collect::<Vec<_>>();
let stream = futures::stream::iter(filter_list.into_iter().map(move |filter| async move {
match self.trace_filter(filter.clone()).await {
Ok(result) => Ok((result, filter)),
Err(err) => Err((err, filter)),
}
}))
.buffered(10);
TraceFilterStream { stream: Box::pin(stream) }
}
}

/// A stream that yields the traces for the requested blocks.
Expand Down

0 comments on commit bcd8e6f

Please sign in to comment.