Skip to content

Commit

Permalink
load all block data for single block request
Browse files Browse the repository at this point in the history
  • Loading branch information
tmcgroul committed Dec 11, 2023
1 parent b891c58 commit c634513
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 11 deletions.
13 changes: 13 additions & 0 deletions src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,37 @@ use tracing::debug;
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct LogRequest {
#[serde(skip_serializing_if = "Vec::is_empty")]
pub address: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub topic0: Vec<String>,
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub transaction: bool,
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub transaction_traces: bool,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct TxRequest {
#[serde(skip_serializing_if = "Vec::is_empty")]
pub to: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub sighash: Vec<String>,
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub traces: bool,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct TraceRequest {
#[serde(skip_serializing_if = "Vec::is_empty")]
pub call_to: Vec<String>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub call_sighash: Vec<String>,
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub transaction: bool,
#[serde(skip_serializing_if = "std::ops::Not::not")]
pub parents: bool,
}

Expand Down Expand Up @@ -132,6 +143,8 @@ pub struct BatchRequest {
#[serde(skip_serializing_if = "Option::is_none")]
pub logs: Option<Vec<LogRequest>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub transactions: Option<Vec<TxRequest>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub traces: Option<Vec<TraceRequest>>,
}

Expand Down
16 changes: 14 additions & 2 deletions src/datasource.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
use futures_core::stream::Stream;

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct LogRequest {
pub address: Vec<String>,
pub topic0: Vec<String>,
pub transaction: bool,
pub transaction_traces: bool,
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, Default)]
pub struct TxRequest {
pub address: Vec<String>,
pub sighash: Vec<String>,
pub traces: bool,
}

#[derive(Debug, Clone, Default)]
pub struct TraceRequest {
pub address: Vec<String>,
pub sighash: Vec<String>,
pub transaction: bool,
pub parents: bool,
}

#[derive(Debug, Clone)]
pub struct DataRequest {
pub from: u64,
pub to: Option<u64>,
pub logs: Vec<LogRequest>,
pub transactions: Vec<TxRequest>,
pub traces: Vec<TraceRequest>,
}

Expand Down
48 changes: 43 additions & 5 deletions src/ds_archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
archive,
archive::{
Archive, BatchRequest, BlockFieldSelection, FieldSelection, LogFieldSelection, LogRequest,
TraceFieldSelection, TxFieldSelection, TraceRequest,
TraceFieldSelection, TxFieldSelection, TraceRequest, TxRequest
},
};
use async_stream::try_stream;
Expand Down Expand Up @@ -115,13 +115,50 @@ impl DataSource for ArchiveDataSource {
.map(|r| LogRequest {
address: r.address,
topic0: r.topic0,
transaction: true,
transaction_traces: true,
transaction: r.transaction,
transaction_traces: r.transaction_traces,
})
.collect();
Some(logs)
};

let transactions = if request.transactions.is_empty() {
None
} else {
fields.transaction = Some(TxFieldSelection {
cumulative_gas_used: true,
effective_gas_price: true,
from: true,
gas: true,
gas_price: true,
gas_used: true,
input: true,
max_fee_per_gas: true,
max_priority_fee_per_gas: true,
nonce: true,
r: true,
s: true,
hash: true,
status: true,
to: true,
transaction_index: true,
r#type: true,
v: true,
value: true,
y_parity: true,
});
let transactions = request
.transactions
.into_iter()
.map(|r| TxRequest {
to: r.address,
sighash: r.sighash,
traces: r.traces,
})
.collect();
Some(transactions)
};

let traces = if request.traces.is_empty() {
None
} else {
Expand Down Expand Up @@ -171,8 +208,8 @@ impl DataSource for ArchiveDataSource {
.map(|r| TraceRequest {
call_to: r.address,
call_sighash: r.sighash,
transaction: true,
parents: true,
transaction: r.transaction,
parents: r.parents,
})
.collect();
Some(traces)
Expand All @@ -183,6 +220,7 @@ impl DataSource for ArchiveDataSource {
to_block: request.to,
fields: Some(fields),
logs,
transactions,
traces,
};

Expand Down
34 changes: 30 additions & 4 deletions src/firehose.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::cursor::Cursor;
use crate::datasource::{
Block, BlockHeader, CallType, DataRequest, DataSource, HashAndHeight, HotDataSource, Log,
LogRequest, Trace, TraceResult, TraceType, Transaction, TraceRequest,
LogRequest, Trace, TraceResult, TraceType, Transaction, TraceRequest, TxRequest,
};
use crate::pbcodec;
use crate::pbfirehose::single_block_request::Reference;
Expand Down Expand Up @@ -174,6 +174,7 @@ impl Firehose {
from: max(state.next_block(), start_block),
to: to_block,
logs: logs.clone(),
transactions: vec![],
traces: traces.clone(),
};
let mut stream = Pin::from(archive.get_finalized_blocks(req, rpc.is_some())?);
Expand Down Expand Up @@ -219,6 +220,7 @@ impl Firehose {
from: max(state.next_block(), start_block),
to: Some(to),
logs: logs.clone(),
transactions: vec![],
traces: traces.clone(),
};
let mut stream = Pin::from(rpc.get_finalized_blocks(req, true)?);
Expand Down Expand Up @@ -254,6 +256,7 @@ impl Firehose {
from: max(state.next_block(), start_block),
to: to_block,
logs,
transactions: vec![],
traces,
};
let mut last_head: HashAndHeight = state.into();
Expand Down Expand Up @@ -310,6 +313,10 @@ impl Firehose {
}

pub async fn block(&self, request: SingleBlockRequest) -> anyhow::Result<SingleBlockResponse> {
if !request.transforms.is_empty() {
anyhow::bail!("trasnforms aren't supported in SingleBlockRequest")
}

let block_num = match request.reference.as_ref().unwrap() {
Reference::BlockNumber(block_number) => block_number.num,
Reference::BlockHashAndNumber(block_hash_and_number) => block_hash_and_number.num,
Expand All @@ -322,11 +329,26 @@ impl Firehose {
let req = DataRequest {
from: block_num,
to: Some(block_num),
logs: vec![],
traces: vec![],
logs: vec![LogRequest::default()],
transactions: vec![TxRequest::default()],
traces: vec![TraceRequest::default()],
};

let mut stream = Pin::from(self.archive.get_finalized_blocks(req, true)?);
let archive_height = self.archive.get_finalized_height().await?;
let mut stream = if block_num <= archive_height {
Pin::from(self.archive.get_finalized_blocks(req, true)?)
} else {
if let Some(rpc) = &self.rpc {
let rpc_height = rpc.get_finalized_height().await?;
if block_num <= rpc_height {
Pin::from(self.archive.get_finalized_blocks(req, true)?)
} else {
anyhow::bail!("block isn't found")
}
} else {
anyhow::bail!("block isn't found")
}
};
let blocks = stream.next().await.unwrap()?;
let block = blocks.into_iter().nth(0).unwrap();

Expand Down Expand Up @@ -354,6 +376,8 @@ impl From<LogFilter> for LogRequest {
.into_iter()
.map(|signature| prefix_hex::encode(signature))
.collect(),
transaction: true,
transaction_traces: true,
}
}
}
Expand All @@ -371,6 +395,8 @@ impl From<CallToFilter> for TraceRequest {
.into_iter()
.map(|signature| prefix_hex::encode(signature))
.collect(),
transaction: true,
parents: true,
}
}
}
Expand Down

0 comments on commit c634513

Please sign in to comment.