Skip to content
This repository has been archived by the owner on Nov 19, 2024. It is now read-only.

Move bench files #8

Merged
merged 6 commits into from
Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
toolchain:
- stable
- beta
- nightly
# - nightly
steps:
- uses: actions/checkout@v3
- run: rustup update ${{ matrix.toolchain }} && rustup default ${{ matrix.toolchain }}
Expand Down
84 changes: 73 additions & 11 deletions Readme.md
Original file line number Diff line number Diff line change
@@ -1,25 +1,87 @@
# Flat files decoder for firehose

## Usage
[![CI status](https://github.com/semiotic-ai/flat-files-decoder/workflows/ci/badge.svg)][gh-ci]

this crate is designed to decompress and decode headers from [binary files, which are called flat files,](https://github.com/streamingfast/firehose-ethereum/blob/develop/proto/sf/ethereum/type/v2/type.proto) generated from Firehose. Flat files store all information necessary to reconstruct the transaction and receipt tries. It also checks the validity of receipt roots and transaction roots present in the block headers by recalculating them via the block body data. Details of the implementation can be found [here](https://github.com/streamingfast/dbin?tab=readme-ov-file).
This check ensures that receipt logs and transaction data stored in the flat files are internally consistent with the block headers also stored in the flat files.

This tool was first presented as a mean to enhance the performance and verifiability of The Graph protocol. However,
it turns out it could be used as a solution for EIP-4444 problem of full nodes stopping to provide historical data over one year.
The idea is that the flat files that this crate can decode could also be used as an archival format similar to era1 files if the
transaction and receipt data stored in the flat files can be verified to be part of the Ethereum canonical history.

For more information, read our ethresear.ch [here](https://ethresear.ch/t/using-the-graph-to-preserve-historical-data-and-enable-eip-4444/17318) and our pre-merge
solution implementation [here](https://github.com/semiotic-ai/header_accumulator) to see how we can verify that the flat files are consistent with Ethereum's canonical chain.

## Getting Started

### Prerequisites
- Rust installed
- Cargo installed
- [protoc installed](https://grpc.io/docs/protoc-installation/)
- [Rust (stable)](https://www.rust-lang.org/tools/install)
- Cargo (Comes with Rust by default)
- [protoc](https://grpc.io/docs/protoc-installation/)
- Firehose dbin files to decode
- An example file is provided `example0017686312.dbin`

### Running
- Run `cargo run --release` in the root directory of the project
- The program will decode all files in the input_files directory
- It will verify the receipt root & transaction root matches the computed one for all blocks
## Running

### Commands

The tool provides the following commands for various operations:

- `stream`: Stream data continuously.
- `decode`: Decode files from input to output.
- `help`: Print this message or the help of the given subcommand(s).

### Options
- `--input <path>`: Specify a directory or single file to read from (default: input_files)
- `--output <path>`: Specify a directory to write output files to (if missing it will not write to disk)

### Benchmarks
You can use the following options with the commands for additional functionalities:

- `-h, --help`: Print help information about specific command and options.
- `-V, --version`: Print the version information of the tool.


#### NOTICE: either streaming or reading from directory it will verify the receipt root & transaction root matches the computed one for all blocks

## Usage Examples

Here are some examples of how to use the commands:

1. To stream data continuously from `stdin`:

```bash
# simply turning on stream stdin reading
cargo run stream

# or from files into stdin
cat example0017686312.dbin | cargo run stream
```

This will output decoded header records as bytes into `stdout`

2. To check a folder of dbin files:

```bash
cargo run decode --input ./input_files/
```

This will store the block headers as json format in the output folder.
By passing `--headers-dir` a folder of assumed valid block headers can be provided to compare
with the input flat files. Valid headers can be pulled from the [sync committee subprotocol](https://github.com/ethereum/annotated-spec/blob/master/altair/sync-protocol.md) for post-merge data.

**NOTICE:**For pre-merge data another approach using the [header accumulator](https://github.com/ethereum/portal-network-specs/blob/8ad5bc33cb0d4485d2eab73bf2decc43e7566a8f/history-network.md#the-header-accumulator) is necessary since
sync committees will not provide these headers.

## Goals

Our goal is to provide The Graph's Indexers the tools to trustlessly share flat files with cryptographic guarantees
that the data in the flat files is part of the canonical history of the Ethereum blockchain,
enabling Indexers to quickly sync all historical data and begin serving data with minimal effort.

## Benchmarking
- Run `cargo bench` in the root directory of the project
- Benchmark results will be output to the terminal
- Benchmark time includes reading from disk & writing output to disk
- Results can be found in `target/criterion/report/index.html`

For proper benchmarking of future improvements, fixes and features please compare baselines.
Refer to [the end of this section of Criterion documentation](https://bheisler.github.io/criterion.rs/book/user_guide/command_line_options.html) for more information on creating and comparing baselines.
2 changes: 1 addition & 1 deletion benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn bench(c: &mut Criterion) {
group.sample_size(ITERS_PER_FILE);

group.bench_function("handle-flat-file", |b| {
let files = fs::read_dir("benchmark_files").expect("Failed to read dir");
let files = fs::read_dir("tests/benchmark_files").expect("Failed to read dir");
for file in files {
let path = file.expect("Failed to get path").path();
match path.extension() {
Expand Down
10 changes: 5 additions & 5 deletions benches/stream_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ fn read_decode_check_bench(c: &mut Criterion) {
group.sample_size(ITERS_PER_FILE);

group.bench_function("read-message-stream", |b| {
let files = fs::read_dir("benchmark_files/pre_merge").expect("Failed to read dir");
let files = fs::read_dir("tests/benchmark_files/pre_merge").expect("Failed to read dir");
for file in files {
let path = file.expect("Failed to get path").path();
match path.extension() {
Expand Down Expand Up @@ -50,7 +50,7 @@ fn read_decode_check_bench(c: &mut Criterion) {
});

group.bench_function("decode-bstream", |b| {
let files = fs::read_dir("benchmark_files/pre_merge").expect("Failed to read dir");
let files = fs::read_dir("tests/benchmark_files/pre_merge").expect("Failed to read dir");
for file in files {
let path = file.expect("Failed to get path").path();
match path.extension() {
Expand Down Expand Up @@ -78,7 +78,7 @@ fn read_decode_check_bench(c: &mut Criterion) {
});

group.bench_function("decode-block", |b| {
let files = fs::read_dir("benchmark_files/pre_merge").expect("Failed to read dir");
let files = fs::read_dir("tests/benchmark_files/pre_merge").expect("Failed to read dir");
for file in files {
let path = file.expect("Failed to get path").path();
match path.extension() {
Expand Down Expand Up @@ -110,7 +110,7 @@ fn read_decode_check_bench(c: &mut Criterion) {
});

group.bench_function("receipts-check", |b| {
let files = fs::read_dir("benchmark_files/pre_merge").expect("Failed to read dir");
let files = fs::read_dir("tests/benchmark_files/pre_merge").expect("Failed to read dir");
for file in files {
let path = file.expect("Failed to get path").path();
match path.extension() {
Expand Down Expand Up @@ -142,7 +142,7 @@ fn read_decode_check_bench(c: &mut Criterion) {
});

group.bench_function("transactions-check", |b| {
let files = fs::read_dir("benchmark_files/pre_merge").expect("Failed to read dir");
let files = fs::read_dir("tests/benchmark_files/pre_merge").expect("Failed to read dir");
for file in files {
let path = file.expect("Failed to get path").path();
match path.extension() {
Expand Down
6 changes: 3 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub enum DecodeInput {
Path(String),
Reader(Box<dyn Read>),
}

/**
* Decode & verify flat files from a directory or a single file.
* Input can be a directory or a file.
Expand Down Expand Up @@ -210,9 +211,8 @@ pub fn extract_blocks<R: Read>(mut reader: R) -> Result<Vec<Block>, DecodeError>
///
/// # Arguments
///
/// * `end_block`: Header Accumulator solution is expensive. For blocks after the merge,
/// Ethereum consensus should be used in this scenario. This zis why the default block
/// for this variable is the MERGE_BLOCK (block 15537393)
/// * `end_block`: For blocks after the merge, Ethereum sync committee should be used. This is why the default block
/// for this param is the MERGE_BLOCK (block 15537393)
/// * `reader`: where bytes are read from
/// * `writer`: where bytes written to
pub async fn stream_blocks<R: Read, W: Write>(
Expand Down
6 changes: 6 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,23 @@ struct Cli {
enum Commands {
/// Stream data continuously
Stream {
/// decompress .dibn files if they are compressed with zstd
#[clap(short, long, default_value = "false")]
decompress: bool,
/// the block to end streaming
#[clap(short, long)]
end_block: Option<usize>,
},
/// Decode files from input to output
Decode {
/// input folder where flat files are stored
#[clap(short, long)]
input: String,
#[clap(long)]
/// folder where valid headers are stored so decoded blocks can be validated against
/// their headers.
headers_dir: Option<String>,
/// output folder where decoded headers will be stored as .json
#[clap(short, long)]
output: Option<String>,
},
Expand Down
36 changes: 35 additions & 1 deletion src/receipts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ use reth_primitives::proofs::ordered_trie_root_with_encoder;
use reth_rlp::Encodable;
use revm_primitives::B256;

const BYZANTINUM_FORK_BLOCK: u64 = 4_370_000;

/// Verifies the receipt root in a given block's header against a
/// computed receipt root from the block's body.
///
/// # Arguments
///
/// * `block` reference to the block which the root will be verified
pub fn check_receipt_root(block: &Block) -> Result<(), ReceiptError> {
let computed_root = calc_receipt_root(block)?;
let receipt_root = match block.header {
Expand All @@ -28,6 +36,13 @@ pub fn check_receipt_root(block: &Block) -> Result<(), ReceiptError> {
Ok(())
}

/// Calculates the trie receipt root of a given block recepits
///
/// It uses the traces to aggregate receipts from blocks
///
/// # Arguments
///
/// * `block` reference to the block which the root will be verified
fn calc_receipt_root(block: &Block) -> Result<B256, ReceiptError> {
let mut receipts = Vec::new();

Expand All @@ -40,8 +55,26 @@ fn calc_receipt_root(block: &Block) -> Result<B256, ReceiptError> {
Ok(ordered_trie_root_with_encoder(&receipts, encoder))
}

/// Encodes full rceipts using [RLP serialization](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp)
///
/// For blocks before the Byzantium fork, it uses a specific RLP encoding that includes the receipt's header length values, state root,
severiano-sisneros marked this conversation as resolved.
Show resolved Hide resolved
/// cumulative gas used, bloom filter, and logs.
/// For blocks at or after the Byzantium fork, it encodes the receipt's inner contents without the header.
///
/// This function is useful for computing the trie root hash which in reth needs to be rlp encoded.
///
/// # Arguments
///
/// * `block` reference to the [`Block`] where [`FullReceipt`] will be extracted from
///
/// # Returns
///
/// a function that takes a refenrece to a [`FullReceipt`],
/// and a mutable reference to a type implementing the [`BufMut`].
/// All the data from the receipts in written into the `BufMut` buffer

fn get_encoder(block: &Block) -> fn(&FullReceipt, &mut dyn BufMut) {
if block.number >= 4_370_000 {
if block.number >= BYZANTINUM_FORK_BLOCK {
|r: &FullReceipt, out: &mut dyn BufMut| r.receipt.encode_inner(out, false)
} else {
|r: &FullReceipt, out: &mut dyn BufMut| {
Expand All @@ -54,6 +87,7 @@ fn get_encoder(block: &Block) -> fn(&FullReceipt, &mut dyn BufMut) {
}
}

/// Encodes receipt header using [RLP serialization](https://ethereum.org/en/developers/docs/data-structures-and-encoding/rlp)
fn receipt_rlp_header(receipt: &FullReceipt) -> reth_rlp::Header {
let payload_length = receipt.state_root.as_slice().length()
+ receipt.receipt.receipt.cumulative_gas_used.length()
Expand Down
Loading