diff --git a/Cargo.lock b/Cargo.lock index 131af377..26529440 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2242,18 +2242,17 @@ dependencies = [ "bincode", "clap", "criterion", + "env_logger 0.11.5", "firehose-protos", + "insta", "prost", "prost-build", "prost-wkt-build", "rand", - "rayon", "reth-primitives", "serde", "serde_json", "thiserror", - "tokio", - "tokio-test", "tracing", "tracing-subscriber", "zstd", @@ -3030,6 +3029,7 @@ dependencies = [ "console", "lazy_static", "linked-hash-map", + "serde", "similar", ] @@ -5923,19 +5923,6 @@ dependencies = [ "tokio-util", ] -[[package]] -name = "tokio-test" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" -dependencies = [ - "async-stream", - "bytes", - "futures-core", - "tokio", - "tokio-stream", -] - [[package]] name = "tokio-util" version = "0.7.12" diff --git a/Cargo.toml b/Cargo.toml index 4cbb7e07..c75c2a18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ fake = "2.10.0" futures = "0.3.31" hex = "0.4.3" http = "1.1.0" -insta = "1.39.0" +insta = "1.41.1" log = "0.4.20" merkle_proof = { git = "https://github.com/semiotic-ai/lighthouse.git", branch = "stable" } object_store = { version = "0.11.1", features = ["gcp", "http", "aws"] } @@ -35,7 +35,6 @@ prost-wkt = "0.6.0" prost-wkt-types = "0.6.0" prost-wkt-build = "0.6.0" rand = "0.8.5" -rayon = "1.8.0" reqwest = { version = "0.12.8", features = ["json"] } reth-primitives = { git = "https://github.com/paradigmxyz/reth", version = "1.1.0", tag = "v1.1.0" } reth-trie-common = { git = "https://github.com/paradigmxyz/reth", version = "1.1.0", tag = "v1.1.0" } @@ -50,7 +49,6 @@ tempfile = "3.0" thiserror = "1.0.63" tokio = "1.39.2" tokio-stream = "0.1.16" -tokio-test = "0.4.3" tonic = "0.12.0" tonic-build = "0.12.0" tracing = "0.1.40" diff --git a/crates/flat-files-decoder/Cargo.toml b/crates/flat-files-decoder/Cargo.toml index 288e06be..7d5caf35 100644 --- a/crates/flat-files-decoder/Cargo.toml +++ b/crates/flat-files-decoder/Cargo.toml @@ -7,32 +7,27 @@ edition = "2021" name = "flat_files_decoder" path = "src/lib.rs" -[[bin]] -name = "flat-files-decoder" -path = "src/main.rs" - [dependencies] alloy-primitives.workspace = true alloy-consensus.workspace = true alloy-eip2930.workspace = true bincode.workspace = true -clap.workspace = true firehose-protos = { path = "../firehose-protos" } prost.workspace = true -rand.workspace = true -rayon.workspace = true reth-primitives.workspace = true serde = { workspace = true, features = ["derive"] } serde_json.workspace = true thiserror.workspace = true -tokio = { workspace = true, features = ["full"] } tracing.workspace = true -tracing-subscriber = { workspace = true, features = ["json", "env-filter"] } zstd.workspace = true [dev-dependencies] +clap.workspace = true criterion.workspace = true -tokio-test.workspace = true +env_logger.workspace = true +insta = { workspace = true, features = ["json"] } +rand.workspace = true +tracing-subscriber = { workspace = true, features = ["json", "env-filter"] } [[bench]] name = "decoder" diff --git a/crates/flat-files-decoder/README.md b/crates/flat-files-decoder/README.md new file mode 100644 index 00000000..145f25c8 --- /dev/null +++ b/crates/flat-files-decoder/README.md @@ -0,0 +1,60 @@ +# Flat Files Decoder + +## Running CLI Example + +### Commands + +The tool provides the following commands for various operations: + +- `stream`: Stream data continuously. +- `decode`: Decode files from input to output. +- `help`: Print this message or the help of the given subcommand(s). + +### Options + +You can use the following options with the commands for additional functionalities: + +- `-h, --help`: Print help information about specific command and options. +- `-V, --version`: Print the version information of the tool. + +### Usage Examples + +Here are some examples of how to use the commands: + +1. To stream data continuously from `stdin`: + +```terminal +cargo run -p flat-files-decoder --example cli stream +``` + +```terminal +cat example0017686312.dbin | cargo run -p flat-files-decoder --example cli stream +``` + +This will output decoded header records as bytes into `stdout` + +1. To check a folder of dbin files: + +```terminal +cargo run -p flat-files-decoder --example cli decode --input ./input_files/ --compression true +``` + +So, if using test data from a `test-assets/` folder in the root of the `veemon` repo: + +```terminal +cargo run -p flat-files-decoder --example cli decode --input test-assets/benchmark_files/pre_merge +``` + +This will store the block headers as json format in the output folder. +By passing `--headers-dir` a folder of assumed valid block headers can be provided to compare +with the input flat files. Valid headers can be pulled from the [sync committee subprotocol](https://github.com/ethereum/annotated-spec/blob/master/altair/sync-protocol.md) for post-merge data. + +## Benchmarking + +- Run `cargo bench` in the root directory of the project +- Benchmark results will be output to the terminal +- Benchmark time includes reading from disk & writing output to disk +- Results can be found in `target/criterion/report/index.html` + +For proper benchmarking of future improvements, fixes and features please compare baselines. +Refer to [the end of this section of Criterion documentation](https://bheisler.github.io/criterion.rs/book/user_guide/command_line_options.html) for more information on creating and comparing baselines. diff --git a/crates/flat-files-decoder/Readme.md b/crates/flat-files-decoder/Readme.md deleted file mode 100644 index 5761cfb8..00000000 --- a/crates/flat-files-decoder/Readme.md +++ /dev/null @@ -1,90 +0,0 @@ -# Flat files decoder for firehose - -[![CI status](https://github.com/semiotic-ai/flat-files-decoder/workflows/ci/badge.svg)][gh-ci] - -this crate is designed to decompress and decode headers from [binary files, which are called flat files,](https://github.com/streamingfast/firehose-ethereum/blob/develop/proto/sf/ethereum/type/v2/type.proto) generated from Firehose. Flat files store all information necessary to reconstruct the transaction and receipt tries. It also checks the validity of receipt roots and transaction roots present in the block headers by recalculating them via the block body data. Details of the implementation can be found [here](https://github.com/streamingfast/dbin?tab=readme-ov-file). -This check ensures that receipt logs and transaction data stored in the flat files are internally consistent with the block headers also stored in the flat files. - -This tool was first presented as a mean to enhance the performance and verifiability of The Graph protocol. However, -it turns out it could be used as a solution for EIP-4444 problem of full nodes stopping to provide historical data over one year. -The idea is that the flat files that this crate can decode could also be used as an archival format similar to era1 files if the -transaction and receipt data stored in the flat files can be verified to be part of the Ethereum canonical history. - -For more information, read our ethresear.ch [here](https://ethresear.ch/t/using-the-graph-to-preserve-historical-data-and-enable-eip-4444/17318) and our pre-merge -solution implementation [here](https://github.com/semiotic-ai/header_accumulator) to see how we can verify that the flat files are consistent with Ethereum's canonical chain. - -## Getting Started - -### Prerequisites -- [Rust (stable)](https://www.rust-lang.org/tools/install) -- Cargo (Comes with Rust by default) -- [protoc](https://grpc.io/docs/protoc-installation/) -- Firehose dbin files to decode - - An example file is provided `example0017686312.dbin` - -## Running - -### Commands - -The tool provides the following commands for various operations: - -- `stream`: Stream data continuously. -- `decode`: Decode files from input to output. -- `help`: Print this message or the help of the given subcommand(s). - -### Options - -You can use the following options with the commands for additional functionalities: - -- `-h, --help`: Print help information about specific command and options. -- `-V, --version`: Print the version information of the tool. - - -#### NOTICE: either streaming or reading from directory it will verify the receipt root & transaction root matches the computed one for all blocks - -## Usage Examples - -Here are some examples of how to use the commands: - -1. To stream data continuously from `stdin`: - - ```bash - # simply turning on stream stdin reading - cargo run stream - - # or from files into stdin - cat example0017686312.dbin | cargo run stream - ``` - -This will output decoded header records as bytes into `stdout` - -2. To check a folder of dbin files: - -```bash -cargo run decode --input ./input_files/ -``` - -This will store the block headers as json format in the output folder. -By passing `--headers-dir` a folder of assumed valid block headers can be provided to compare -with the input flat files. Valid headers can be pulled from the [sync committee subprotocol](https://github.com/ethereum/annotated-spec/blob/master/altair/sync-protocol.md) for post-merge data. - -**NOTICE:**For pre-merge data another approach using the [header accumulator](https://github.com/ethereum/portal-network-specs/blob/8ad5bc33cb0d4485d2eab73bf2decc43e7566a8f/history-network.md#the-header-accumulator) is necessary since -sync committees will not provide these headers. - -## Goals - -Our goal is to provide The Graph's Indexers the tools to trustlessly share flat files with cryptographic guarantees -that the data in the flat files is part of the canonical history of the Ethereum blockchain, -enabling Indexers to quickly sync all historical data and begin serving data with minimal effort. - -## Benchmarking -- Run `cargo bench` in the root directory of the project -- Benchmark results will be output to the terminal -- Benchmark time includes reading from disk & writing output to disk -- Results can be found in `target/criterion/report/index.html` - -For proper benchmarking of future improvements, fixes and features please compare baselines. -Refer to [the end of this section of Criterion documentation](https://bheisler.github.io/criterion.rs/book/user_guide/command_line_options.html) for more information on creating and comparing baselines. - -## Testing -Generate code coverage reports with `cargo llvm-cov --html` and open them with `open ./target/llvm-cov/html/index.html`. \ No newline at end of file diff --git a/crates/flat-files-decoder/benches/decoder.rs b/crates/flat-files-decoder/benches/decoder.rs index 2c08fb25..39259e7e 100644 --- a/crates/flat-files-decoder/benches/decoder.rs +++ b/crates/flat-files-decoder/benches/decoder.rs @@ -1,8 +1,11 @@ extern crate rand; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use flat_files_decoder::{decoder::handle_file, decompression::Decompression}; -use std::fs; +use flat_files_decoder::read_blocks_from_reader; +use std::{ + fs::{self, File}, + io::BufReader, +}; const ITERS_PER_FILE: usize = 10; @@ -23,7 +26,10 @@ fn bench(c: &mut Criterion) { } } - b.iter(|| handle_file(black_box(&path), None, None, Decompression::None)); + b.iter(|| { + let reader = BufReader::new(File::open(path.as_os_str()).unwrap()); + read_blocks_from_reader(black_box(reader), false.into()) + }); } }); diff --git a/crates/flat-files-decoder/benches/stream_blocks.rs b/crates/flat-files-decoder/benches/stream_blocks.rs index a0b1aad3..2200e133 100644 --- a/crates/flat-files-decoder/benches/stream_blocks.rs +++ b/crates/flat-files-decoder/benches/stream_blocks.rs @@ -4,7 +4,7 @@ use std::{ }; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use flat_files_decoder::{dbin::DbinFile, error::DecoderError}; +use flat_files_decoder::read_block_from_reader; use prost::Message; const ITERS_PER_FILE: usize = 10; @@ -27,16 +27,16 @@ fn read_decode_check_bench(c: &mut Criterion) { } let file = File::open(&path).expect("Failed to open file"); let mut reader = BufReader::new(file); - let mut message: Result, DecoderError> = Err(DecoderError::InvalidDbinBytes); + loop { + let mut message: Result, _> = Ok(Vec::new()); + b.iter(|| { - message = black_box(DbinFile::read_message_stream(&mut reader)); + message = black_box(read_block_from_reader(&mut reader)); }); - match message { - Ok(_) => continue, - Err(_) => { - break; - } + + if message.is_err() { + break; } } } @@ -57,7 +57,7 @@ fn read_decode_check_bench(c: &mut Criterion) { let file = File::open(&path).expect("Failed to open file"); let mut reader = BufReader::new(file); loop { - let message = match DbinFile::read_message_stream(&mut reader) { + let message = match read_block_from_reader(&mut reader) { Ok(message) => message, Err(_) => { break; @@ -88,7 +88,7 @@ fn read_decode_check_bench(c: &mut Criterion) { let file = File::open(&path).expect("Failed to open file"); let mut reader = BufReader::new(file); loop { - let message = match DbinFile::read_message_stream(&mut reader) { + let message = match read_block_from_reader(&mut reader) { Ok(message) => message, Err(_) => { break; @@ -121,7 +121,7 @@ fn read_decode_check_bench(c: &mut Criterion) { let file = File::open(&path).expect("Failed to open file"); let mut reader = BufReader::new(file); loop { - let message = match DbinFile::read_message_stream(&mut reader) { + let message = match read_block_from_reader(&mut reader) { Ok(message) => message, Err(_) => { break; @@ -155,7 +155,7 @@ fn read_decode_check_bench(c: &mut Criterion) { let file = File::open(&path).expect("Failed to open file"); let mut reader = BufReader::new(file); loop { - let message = match DbinFile::read_message_stream(&mut reader) { + let message = match read_block_from_reader(&mut reader) { Ok(message) => message, Err(_) => { break; diff --git a/crates/flat-files-decoder/examples/cli.rs b/crates/flat-files-decoder/examples/cli.rs new file mode 100644 index 00000000..3a046fe0 --- /dev/null +++ b/crates/flat-files-decoder/examples/cli.rs @@ -0,0 +1,321 @@ +use std::{ + fs::{self, DirEntry, File}, + io::{self, BufReader, BufWriter, Write}, + process::ExitCode, +}; + +use alloy_primitives::B256; +use clap::{Parser, Subcommand}; +use firehose_protos::ethereum_v2::{Block, BlockHeader}; +use flat_files_decoder::{ + read_blocks_from_reader, stream_blocks, Compression, DecoderError, Reader, +}; +use serde::{Deserialize, Serialize}; +use tracing::{error, info, level_filters::LevelFilter, subscriber::set_global_default, trace}; +use tracing_subscriber::{EnvFilter, FmtSubscriber}; + +fn main() -> ExitCode { + init_tracing(); + if let Err(e) = run() { + error!("Decoder error: {e}"); + return ExitCode::from(1); + } + ExitCode::SUCCESS +} + +fn init_tracing() { + let filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + let subscriber_builder: tracing_subscriber::fmt::SubscriberBuilder< + tracing_subscriber::fmt::format::DefaultFields, + tracing_subscriber::fmt::format::Format, + EnvFilter, + > = FmtSubscriber::builder().with_env_filter(filter); + set_global_default(subscriber_builder.with_ansi(true).pretty().finish()).expect( + "Failed to set up the global default subscriber for logging. Please check if the RUST_LOG environment variable is set correctly.", + ); +} + +#[derive(Parser, Debug)] +#[clap(author, version, about, long_about = None)] +struct Cli { + #[clap(subcommand)] + command: Commands, +} + +#[derive(Subcommand, Debug)] +enum Commands { + /// Decodes files from an input folder and can save them to an output folder + Decode { + /// Path to the input folder containing flat files + #[clap(short, long)] + input: String, + + /// Optional path to a folder containing headers for validating decoded blocks + #[clap(long)] + headers_dir: Option, + + /// Optional path to an output folder for saving decoded headers as .json files + #[clap(short, long)] + output: Option, + + /// Enables decompression for zstd-compressed flat files + #[clap(short, long, default_value = "false")] + compression: Compression, + }, + + /// Stream data continuously + Stream { + /// Decompresses .dbin files if they are zstd-compressed + #[clap(short, long, default_value = "false")] + compression: Compression, + + /// Block number to end the streaming process + #[clap(short, long)] + end_block: Option, + }, +} + +fn run() -> Result<(), DecoderError> { + use Commands::*; + + let cli = Cli::parse(); + + match cli.command { + Stream { + compression, + end_block, + } => { + let mut blocks = stream_blocks(Reader::StdIn(compression), end_block.into())?; + + let mut writer = BufWriter::new(io::stdout().lock()); + + while let Some(block) = blocks.next() { + let header_record_with_number = HeaderRecordWithNumber::try_from(&block)?; + let header_record_bin = bincode::serialize(&header_record_with_number)?; + + let size = header_record_bin.len() as u32; + writer.write_all(&size.to_be_bytes())?; + writer.write_all(&header_record_bin)?; + writer.flush()?; + } + + Ok(()) + } + Decode { + input, + headers_dir, + output, + compression, + } => { + let blocks = decode_flat_files( + &input, + output.as_deref(), + headers_dir.as_deref(), + compression, + )?; + + info!("Total blocks: {}", blocks.len()); + + Ok(()) + } + } +} + +/// Decodes and optionally verifies block flat files from a given directory or single file. +/// +/// This function processes input which can be a file or a directory containing multiple `.dbin` files. +/// If `headers_dir` is provided, it verifies the block headers against the files found in this directory. +/// These header files must be in JSON format and named after the block number they represent (e.g., `block-.json`). +/// it can also handle `zstd` compressed flat files. +/// +/// # Arguments +/// +/// * `input_path`: A [`String`] specifying the path to the input directory or file. +/// * `output_path`: An [`Option<&str>`] specifying the directory where decoded blocks should be written. +/// If `None`, decoded blocks are not written to disk. +/// * `json_headers_dir`: An [`Option<&str>`] specifying the directory containing header files for verification. +/// Must be a directory if provided. +/// * `compression`: A [`Compression`] enum specifying if it is necessary to decompress from zstd. +fn decode_flat_files( + input_path: &str, + output_path: Option<&str>, + json_headers_dir: Option<&str>, + compression: Compression, +) -> Result, DecoderError> { + let metadata = fs::metadata(input_path)?; + + // Get blocks depending on file or folder + let blocks = if metadata.is_dir() { + info!("Processing directory: {}", input_path); + read_flat_files(input_path, compression) + } else { + info!("Processing file: {}", input_path); + read_flat_file(input_path, compression) + }?; + + if let Some(json_headers_dir) = json_headers_dir { + for block in blocks.iter() { + check_block_against_json(block, json_headers_dir)?; + } + } + + if let Some(path) = output_path { + fs::create_dir_all(path)?; + for block in blocks.iter() { + write_block_to_json(block, path)?; + } + } + + Ok(blocks) +} + +fn create_read_dir(input_path: &str) -> io::Result { + fs::read_dir(input_path) +} + +fn check_block_against_json(block: &Block, headers_dir: &str) -> Result<(), DecoderError> { + let header_file_path = format!("{}/{}.json", headers_dir, block.number); + let header_file = File::open(header_file_path)?; + let header_roots: BlockHeaderRoots = serde_json::from_reader(header_file)?; + + if !header_roots.block_header_matches(block) { + return Err(DecoderError::MatchRootsFailed { + block_number: block.number, + }); + } + + Ok(()) +} + +fn write_block_to_json(block: &Block, output: &str) -> Result<(), DecoderError> { + let file_name = format!("{}/block-{}.json", output, block.number); + let mut out_file = File::create(file_name)?; + + let block_json = serde_json::to_string(&block)?; + + out_file.write_all(block_json.as_bytes())?; + + Ok(()) +} + +/// Decodes and verifies block flat files from a single file. +/// +/// This function decodes and verifies blocks contained within flat files. +/// Additionally, the function supports handling `zstd` compressed flat files if decompression is required. +fn read_flat_file(path: &str, compression: Compression) -> Result, DecoderError> { + let reader = BufReader::new(File::open(path)?); + + let blocks = read_blocks_from_reader(reader, compression)?; + + Ok(blocks) +} + +/// Dbin file type extension +const EXTENSION: &str = "dbin"; + +/// Checks if the file extension is `.dbin`. +fn dir_entry_extension_is_dbin(entry: &DirEntry) -> bool { + entry + .path() + .extension() + .map_or(false, |ext| ext == EXTENSION) +} + +fn read_flat_files(path: &str, compression: Compression) -> Result, DecoderError> { + let read_dir = create_read_dir(path)?; + + let mut blocks: Vec = vec![]; + + for path in read_dir { + let path = path?; + + if !dir_entry_extension_is_dbin(&path) { + continue; + } + + trace!("Processing file: {}", path.path().display()); + + match read_flat_file(path.path().to_str().unwrap(), compression) { + Ok(blocks_vec) => { + blocks.extend(blocks_vec); + } + Err(e) => { + return Err(e); + } + } + } + + Ok(blocks) +} + +/// A struct to hold the block hash, block number, and total difficulty of a block. +#[derive(Serialize, Deserialize)] +struct HeaderRecordWithNumber { + block_hash: Vec, + block_number: u64, + total_difficulty: Vec, +} + +impl TryFrom<&Block> for HeaderRecordWithNumber { + type Error = DecoderError; + + fn try_from(block: &Block) -> Result { + Ok(HeaderRecordWithNumber { + block_hash: block.hash.clone(), + block_number: block.number, + total_difficulty: block + .header()? + .total_difficulty + .as_ref() + .ok_or(Self::Error::TotalDifficultyInvalid)? + .bytes + .clone(), + }) + } +} + +/// A struct to hold the receipt and transactions root for a `Block`. +/// This struct is used to compare the receipt and transactions roots of a block +/// with the receipt and transactions roots of another block. +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +struct BlockHeaderRoots { + receipt_root: B256, + transactions_root: B256, +} + +impl TryFrom<&Block> for BlockHeaderRoots { + type Error = DecoderError; + + fn try_from(block: &Block) -> Result { + block.header()?.try_into() + } +} + +impl TryFrom<&BlockHeader> for BlockHeaderRoots { + type Error = DecoderError; + + fn try_from(header: &BlockHeader) -> Result { + let receipt_root: [u8; 32] = header.receipt_root.as_slice().try_into()?; + let transactions_root: [u8; 32] = header.transactions_root.as_slice().try_into()?; + + Ok(Self { + receipt_root: receipt_root.into(), + transactions_root: transactions_root.into(), + }) + } +} + +impl BlockHeaderRoots { + /// Checks if the receipt and transactions roots of a block header match the receipt and transactions roots of another block. + fn block_header_matches(&self, block: &Block) -> bool { + match block.try_into() { + Ok(other) => self == &other, + Err(e) => { + error!("Failed to convert block to header roots: {e}"); + false + } + } + } +} diff --git a/crates/flat-files-decoder/src/cli.rs b/crates/flat-files-decoder/src/cli.rs deleted file mode 100644 index e0a2d09c..00000000 --- a/crates/flat-files-decoder/src/cli.rs +++ /dev/null @@ -1,39 +0,0 @@ -use clap::{Parser, Subcommand}; - -use crate::decompression::Decompression; - -#[derive(Parser, Debug)] -#[clap(author, version, about, long_about = None)] -pub struct Cli { - #[clap(subcommand)] - pub command: Commands, -} - -#[derive(Subcommand, Debug)] -pub enum Commands { - /// Stream data continuously - Stream { - /// decompress .dbin files if they are compressed with zstd - #[clap(short, long, default_value = "false")] - decompression: Decompression, - /// the block to end streaming - #[clap(short, long)] - end_block: Option, - }, - /// Decode files from input to output - Decode { - /// input folder where flat files are stored - #[clap(short, long)] - input: String, - #[clap(long)] - /// folder where valid headers are stored so decoded blocks can be validated against - /// their headers. - headers_dir: Option, - /// output folder where decoded headers will be stored as .json - #[clap(short, long)] - output: Option, - #[clap(short, long)] - /// optionally decompress zstd compressed flat files - decompression: Decompression, - }, -} diff --git a/crates/flat-files-decoder/src/dbin.rs b/crates/flat-files-decoder/src/dbin.rs index 0d8967a2..17d89ec5 100644 --- a/crates/flat-files-decoder/src/dbin.rs +++ b/crates/flat-files-decoder/src/dbin.rs @@ -1,153 +1,273 @@ -use std::io::Read; +use std::io::{self, Read}; use crate::error::DecoderError; -/// `DbinFile` is a struct that represents a simple file storage format to pack a stream of protobuf messages. It is defined by StreamingFast. +/// The bytes of a dbin file minus the header +type DbinMessages = Vec; + +/// The bytes of a dbin message +type DbinMessage = Vec; + +/// Each dbin message is length-prefixed as 4 bytes big-endian uint32 +const MAGIC_BYTES: &[u8; 4] = b"dbin"; + +/// The 4 magic bytes of a dbin file, indicating the file format +type MagicBytes = [u8; 4]; + +/// The size of the length prefix in bytes +const PREFIX_SIZE: usize = 4; + +/// The size of the header version in bytes +const HEADER_VERSION_SIZE: usize = 1; + +/// The size of the header content type in bytes +const HEADER_CONTENT_TYPE_SIZE: usize = 3; + +/// The size of the header content version in bytes +const HEADER_CONTENT_VERSION_SIZE: usize = 2; + +/// The supported version of the dbin file format +const SUPPORTED_DBIN_VERSION: u8 = 0; + +/// Work with a `.dbin` flat file. /// +/// Developed by StreamingFast, dbin is a simple file storage format to pack a stream of protobuffer messages. /// For more information, see [the dbin format documentation](https://github.com/streamingfast/dbin?tab=readme-ov-file). +#[derive(Debug)] pub struct DbinFile { - pub header: DbinHeader, - /// Rest of the bytes of the file, each message is length-prefixed as 4 bytes big-endian uin32 - pub messages: Vec>, -} - -/// `DbinHeader` contains the fields that compose the header of the .dbin file. -pub struct DbinHeader { - /// Next single byte after the 4 magic bytes, file format version - pub version: u8, - /// Next 3 bytes, content type like 'ETH', 'EOS', or something else - pub content_type: String, - /// Next 2 bytes, 10-based string representation of content version, ranges in '00'-'99' - pub content_version: String, + header: DbinHeader, + messages: DbinMessages, } impl DbinFile { - /// reads a DbinHeader - /// - /// It nests `read_partial_header` to read header. By itself, it reads the 4 magic bytes - fn read_header(read: &mut R) -> Result { - let mut buf: [u8; 4] = [0; 4]; + /// Get the content type of the `.dbin` file, such as `"ETH"`. + pub fn content_type(&self) -> &str { + &self.header.content_type + } - read.read_exact(&mut buf)?; + /// Read and parse a `.dbin` file from a `Read` source. + pub fn try_from_read(mut read: R) -> Result { + let header = DbinHeader::try_from_read(&mut read)?; + if !header.is_supported_version() { + return Err(DecoderError::VersionUnsupported); + } + let messages = Self::read_messages(&mut read)?; + Ok(Self { header, messages }) + } + + /// Reads messages from a `Read` source following the Dbin format. + fn read_messages(read: &mut R) -> Result { + let mut messages = Vec::new(); + + loop { + let bytes = match read_magic_bytes(read) { + Ok(bytes) => bytes, + // Break loop gracefully if EOF is reached at the start of a new message. + Err(DecoderError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => break, + Err(e) => return Err(e), + }; + + let message_length = u32::from_be_bytes(bytes) as usize; - if &buf != b"dbin" { - return Err(DecoderError::StartOfNewDbinFile); + match read_message(read, message_length) { + Ok(message) => messages.push(message), + // Return error if EOF occurs in the middle of a message + Err(DecoderError::Io(e)) if e.kind() == io::ErrorKind::UnexpectedEof => { + return Err(DecoderError::Io(e)) + } + Err(e) => return Err(e), + } } - let dbin_header = Self::read_partial_header(read)?; + Ok(messages) + } +} - Ok(dbin_header) +// implement iterator for DbinFile so that we can iterate over the messages +impl IntoIterator for DbinFile { + type Item = Vec; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.messages.into_iter() } +} - /// Reads all the fields that make a DbinHeader - fn read_partial_header(read: &mut R) -> Result { - let version; - let content_type; - let content_version; +/// Header of a `.dbin` file, containing metadata such as version, content type, and content version. +#[derive(Debug)] +struct DbinHeader { + /// File format version, the next single byte after the 4 [`DbinMagicBytes`] + version: u8, + /// Content type like 'ETH', 'EOS', or something else; the next 3 bytes + content_type: String, +} + +impl DbinHeader { + /// Checks if the version is supported. + fn is_supported_version(&self) -> bool { + is_supported_version(self.version) + } + + /// Reads and validates the `.dbin` header from the given [`Read`] source. + fn try_from_read(read: &mut R) -> Result { + let magic_bytes = read_magic_bytes(read)?; + if !magic_bytes_valid(&magic_bytes) { + return Err(DecoderError::MagicBytesInvalid); + } + read_header(read) + } + + fn read_string_field(read: &mut R, size: usize) -> Result { + let mut field_bytes = vec![0; size]; + read.read_exact(&mut field_bytes)?; + String::from_utf8(field_bytes).map_err(DecoderError::from) + } - let mut buf: [u8; 1] = [0; 1]; + /// Reads a single byte as a version or field. + fn read_version_field(read: &mut R) -> Result { + let mut buf = [0; HEADER_VERSION_SIZE]; read.read_exact(&mut buf)?; + Ok(buf[0]) + } +} - if buf[0] == 0 { - version = 0u8; - let mut content_type_bytes: [u8; 3] = [0; 3]; - read.read_exact(&mut content_type_bytes)?; +fn is_supported_version(version: u8) -> bool { + version == SUPPORTED_DBIN_VERSION +} - content_type = String::from_utf8(Vec::from(content_type_bytes)) - .map_err(DecoderError::InvalidUtf8)?; +fn magic_bytes_valid(bytes: &MagicBytes) -> bool { + bytes == MAGIC_BYTES +} - let mut content_version_bytes: [u8; 2] = [0; 2]; - read.read_exact(&mut content_version_bytes)?; +/// Reads and constructs a [`DbinHeader`] from the remaining fields after the magic bytes. +fn read_header(read: &mut R) -> Result { + let version = match DbinHeader::read_version_field(read) { + Ok(version) if is_supported_version(version) => version, + Ok(_) => return Err(DecoderError::VersionUnsupported), + Err(e) => return Err(e), + }; - content_version = String::from_utf8(Vec::from(content_version_bytes)) - .map_err(DecoderError::InvalidUtf8)?; - } else { - return Err(DecoderError::UnsupportedDbinVersion); - } + let content_type = DbinHeader::read_string_field(read, HEADER_CONTENT_TYPE_SIZE)?; - Ok(DbinHeader { - version, - content_type, - content_version, - }) + // Content version, represented as 10-based string, ranges in '00'-'99'; the next 2 bytes + let _content_version = DbinHeader::read_string_field(read, HEADER_CONTENT_VERSION_SIZE)?; + + Ok(DbinHeader { + version, + content_type, + }) +} + +fn read_magic_bytes(read: &mut R) -> Result { + let bytes = read_message(read, PREFIX_SIZE)?; + match bytes.try_into() { + Ok(magic_bytes) => Ok(magic_bytes), + Err(_) => Err(DecoderError::MagicBytesInvalid), } +} - /// Returns a `DbinFile` from a Reader - pub fn try_from_read(read: &mut R) -> Result { - let dbin_header = Self::read_header(read)?; - let mut messages: Vec> = vec![]; +/// Reads a single message, assuming the size-prefix format defined by `.dbin`. +fn read_message(read: &mut R, length: usize) -> Result { + let mut message = vec![0; length]; + read.read_exact(&mut message)?; + Ok(message) +} - loop { - match Self::read_message(read) { - Ok(message) => messages.push(message), - Err(e) => { - match e { - DecoderError::Io(io_error) => { - if io_error.kind() == std::io::ErrorKind::UnexpectedEof { - return Ok(DbinFile { - header: DbinHeader { - version: dbin_header.version, - content_type: dbin_header.content_type, - content_version: dbin_header.content_version, - }, - messages, - }); - } else if io_error.kind() == std::io::ErrorKind::Other { - // Check that version, content_type, and content_version match the previous header - let dbin_header_new = Self::read_partial_header(read)?; - if dbin_header.version != dbin_header_new.version - || dbin_header.content_type != dbin_header_new.content_type - || dbin_header.content_version - != dbin_header_new.content_version - { - return Err(DecoderError::DifferingDbinVersions); - } - } - } - // Catch all other variants of the error - e => return Err(e), - } - } - } - } +/// Read the next block from a flat file reader. +/// +/// Block messages are separated by "dbin" (the magical 4 bytes), so each +/// new occurrence marks the start of a new `.dbin` file +pub fn read_block_from_reader(read: &mut R) -> Result { + let mut magic_bytes = read_magic_bytes(read)?; + + if magic_bytes_valid(&magic_bytes) { + _ = read_header(read)?; + magic_bytes = read_magic_bytes(read)?; } + + let message_size = u32::from_be_bytes(magic_bytes) as usize; + + read_message(read, message_size) } -impl DbinFile { - /// Reads a single message - fn read_message(read: &mut R) -> Result, DecoderError> { - let mut size: [u8; 4] = [0; 4]; - read.read_exact(&mut size)?; +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; - if &size == b"dbin" { - return Err(DecoderError::StartOfNewDbinFile); - } + #[test] + fn test_valid_header_parsing() { + let data = [b'd', b'b', b'i', b'n', 0u8, b'E', b'T', b'H', b'0', b'1']; + let mut cursor = Cursor::new(data); - Ok(Self::read_content(size, read)?) + let header = DbinHeader::try_from_read(&mut cursor).expect("Failed to parse header"); + assert_eq!(header.version, SUPPORTED_DBIN_VERSION); + assert_eq!(header.content_type, "ETH"); } - /// Reads a stream of messages. - /// - /// Messages are separated by "dbin" (magical 4 bytes) so each - /// new occurrence of it marks the start of a new .dbin file - pub fn read_message_stream(read: &mut R) -> Result, DecoderError> { - let mut size: [u8; 4] = [0; 4]; - read.read_exact(&mut size)?; + #[test] + fn test_unsupported_version() { + let data = [b'd', b'b', b'i', b'n', 1u8, b'E', b'T', b'H', b'0', b'1']; + let mut cursor = Cursor::new(data); - if &size == b"dbin" { - _ = Self::read_partial_header(read)?; - size = [0; 4]; - read.read_exact(&mut size)?; - } + let result = DbinHeader::try_from_read(&mut cursor); + assert!(matches!(result, Err(DecoderError::VersionUnsupported))); + } + + #[test] + fn test_invalid_magic_bytes() { + let data = [b'x', b'y', b'z', b'n', 0u8, b'E', b'T', b'H', b'0', b'1']; + let mut cursor = Cursor::new(data); - Ok(Self::read_content(size, read)?) + let result = DbinHeader::try_from_read(&mut cursor); + assert!(matches!(result, Err(DecoderError::MagicBytesInvalid))); } - /// reads message bytes - fn read_content(size: [u8; 4], read: &mut R) -> Result, std::io::Error> { - let size = u32::from_be_bytes(size); - let mut content: Vec = vec![0; size as usize]; - read.read_exact(&mut content)?; - Ok(content) + #[test] + fn test_read_messages() { + let mut data = vec![]; + data.extend_from_slice(&[b'd', b'b', b'i', b'n', 0u8, b'E', b'T', b'H', b'0', b'1']); + data.extend_from_slice(&(4u32.to_be_bytes())); // message length + data.extend_from_slice(b"test"); + + let mut cursor = Cursor::new(data); + let dbin_file = DbinFile::try_from_read(&mut cursor).expect("Failed to read dbin file"); + + assert_eq!(dbin_file.messages.len(), 1); + assert_eq!(dbin_file.messages[0], b"test"); + } + + #[test] + fn test_end_of_file_handling() { + let mut data = vec![]; + data.extend_from_slice(&[b'd', b'b', b'i', b'n', 0u8, b'E', b'T', b'H', b'0', b'1']); + data.extend_from_slice(&(4u32.to_be_bytes())); // message length + data.extend_from_slice(b"test"); + + // truncate to simulate EOF after header + let mut cursor = Cursor::new(&data[..data.len() - 2]); + + let result = DbinFile::try_from_read(&mut cursor); + assert!( + matches!(result, Err(DecoderError::Io(ref e)) if e.kind() == io::ErrorKind::UnexpectedEof) + ); + } + + #[test] + fn test_iterator_behavior() { + let mut data = vec![]; + data.extend_from_slice(&[b'd', b'b', b'i', b'n', 0u8, b'E', b'T', b'H', b'0', b'1']); + data.extend_from_slice(&(4u32.to_be_bytes())); // message length + data.extend_from_slice(b"test"); + data.extend_from_slice(&(3u32.to_be_bytes())); // message length + data.extend_from_slice(b"123"); + + let mut cursor = Cursor::new(data); + let dbin_file = DbinFile::try_from_read(&mut cursor).expect("Failed to read dbin file"); + + let messages: Vec<_> = dbin_file.into_iter().collect(); + assert_eq!(messages.len(), 2); + assert_eq!(messages[0], b"test"); + assert_eq!(messages[1], b"123"); } } diff --git a/crates/flat-files-decoder/src/decoder.rs b/crates/flat-files-decoder/src/decoder.rs index 0b042f13..8ac74cc3 100644 --- a/crates/flat-files-decoder/src/decoder.rs +++ b/crates/flat-files-decoder/src/decoder.rs @@ -1,547 +1,219 @@ -use std::{ - fs::{self, File}, - io::{self, BufReader, BufWriter, Cursor, Read, Write}, - path::PathBuf, -}; +use std::io::{BufReader, Cursor, Read}; -use alloy_primitives::B256; -use clap::Parser; use firehose_protos::{ bstream, - ethereum_v2::{self, Block, BlockHeader}, + ethereum_v2::{self, Block}, }; use prost::Message; -use serde::{Deserialize, Serialize}; -use tokio::join; -use tracing::{error, info, trace}; -use zstd::stream::decode_all; - -use crate::{ - cli::{Cli, Commands}, - dbin::DbinFile, - decompression::Decompression, - error::DecoderError, -}; +use tracing::{error, info}; -pub async fn run() -> Result<(), DecoderError> { - let cli = Cli::parse(); +use crate::{dbin::read_block_from_reader, error::DecoderError, DbinFile}; - match cli.command { - Commands::Stream { - decompression, - end_block, - } => match decompression { - Decompression::Zstd => { - let reader = zstd::stream::Decoder::new(io::stdin())?; - let writer = BufWriter::new(io::stdout().lock()); - stream_blocks(reader, writer, end_block).await - } - Decompression::None => { - let reader = BufReader::with_capacity((64 * 2) << 20, io::stdin().lock()); - let writer = BufWriter::new(io::stdout().lock()); - stream_blocks(reader, writer, end_block).await - } - }, - Commands::Decode { - input, - headers_dir, - output, - decompression, - } => { - let blocks = decode_flat_files( - input, - output.as_deref(), - headers_dir.as_deref(), - decompression, - )?; - - info!("Total blocks: {}", blocks.len()); - - Ok(()) - } - } +/// Work with data compression, including zstd. +#[derive(Clone, Copy, Debug, Default)] +pub enum Compression { + Zstd, + #[default] + None, } -/// Decodes and optionally verifies block flat files from a given directory or single file. -/// -/// This function processes input which can be a file or a directory containing multiple `.dbin` files. -/// If `headers_dir` is provided, it verifies the block headers against the files found in this directory. -/// These header files must be in JSON format and named after the block number they represent (e.g., `block-.json`). -/// it can also handle `zstd` compressed flat files. -/// -/// # Arguments -/// -/// * `input`: A [`String`] specifying the path to the input directory or file. -/// * `output`: An [`Option<&str>`] specifying the directory where decoded blocks should be written. -/// If `None`, decoded blocks are not written to disk. -/// * `headers_dir`: An [`Option<&str>`] specifying the directory containing header files for verification. -/// Must be a directory if provided. -/// * `decompress`: A [`Decompression`] enum specifying if it is necessary to decompress from zstd. -pub fn decode_flat_files( - input: String, - output: Option<&str>, - headers_dir: Option<&str>, - decompress: Decompression, -) -> Result, DecoderError> { - let metadata = fs::metadata(&input)?; - - if let Some(output) = output { - fs::create_dir_all(output)?; - } - - if metadata.is_dir() { - decode_flat_files_dir(&input, output, headers_dir, decompress) - } else if metadata.is_file() { - handle_file(&PathBuf::from(input), output, headers_dir, decompress) - } else { - Err(DecoderError::InvalidInput) +impl From<&str> for Compression { + fn from(value: &str) -> Self { + match value.to_lowercase().as_str() { + "true" | "1" => Compression::Zstd, + _ => Compression::None, + } } } -fn decode_flat_files_dir( - input: &str, - output: Option<&str>, - headers_dir: Option<&str>, - decompress: Decompression, -) -> Result, DecoderError> { - info!("Processing directory: {}", input); - let paths = fs::read_dir(input)?; - - let mut blocks: Vec = vec![]; - for path in paths { - let path = path?; - match path.path().extension() { - Some(ext) => { - if ext != "dbin" { - continue; - } - } - None => continue, - }; - - trace!("Processing file: {}", path.path().display()); - match handle_file(&path.path(), output, headers_dir, decompress) { - Ok(file_blocks) => { - blocks.extend(file_blocks); - } - Err(err) => { - error!("Failed to process file: {}", err); - } +impl From for Compression { + fn from(value: bool) -> Self { + match value { + true => Compression::Zstd, + false => Compression::None, } } - - Ok(blocks) } -/// Decodes and optionally verifies block flat files from a single file. +/// Read blocks from a flat file reader. /// -/// This function decodes flat files and, if an `output` directory is provided, writes the decoded blocks to this directory. -/// If no `output` is specified, the decoded blocks are not written to disk. The function can also verify block headers -/// against header files found in an optional `headers_dir`. These header files must be in JSON format and named after -/// the block number they represent (e.g., `block-.json`). Additionally, the function supports handling `zstd` compressed -/// flat files if decompression is required. +/// This function processes flat files that are already loaded into memory, supporting both +/// compressed (Zstd) and uncompressed data. If the data is successfully decoded, it returns a +/// vector of `Block` structs representing the blocks contained within the file. The number of +/// blocks returned depends on the file's content and format, which may include one or more blocks. /// /// # Arguments /// -/// * `input`: A [`String`] specifying the path to the file. -/// * `output`: An [`Option<&str>`] specifying the directory where decoded blocks should be written. -/// If `None`, decoded blocks are not written to disk. -/// * `headers_dir`: An [`Option<&str>`] specifying the directory containing header files for verification. -/// Must be a directory if provided. -/// * `decompress`: A [`Decompression`] enum indicating whether decompression from `zstd` format is necessary. -/// -pub fn handle_file( - path: &PathBuf, - output: Option<&str>, - headers_dir: Option<&str>, - decompress: Decompression, +/// * `reader`: A readable source of the file contents, implementing the `Read` trait. +/// * `compression`: The compression type applied to the flat file's data, if any. Accepts `Compression::Zstd` +/// for Zstd-compressed data, or `Compression::None` for uncompressed data. +pub fn read_blocks_from_reader( + reader: R, + compression: Compression, ) -> Result, DecoderError> { - let input_file = BufReader::new(File::open(path)?); + const CONTENT_TYPE: &str = "ETH"; - // Check if decompression is required and read the file accordingly. - let mut file_contents: Box = match decompress { - Decompression::Zstd => { - let decompressed_data = decode_all(input_file)?; - Box::new(Cursor::new(decompressed_data)) - } - Decompression::None => Box::new(input_file), + let mut file_contents: Box = match compression { + Compression::Zstd => Box::new(Cursor::new(zstd::decode_all(reader)?)), + Compression::None => Box::new(reader), }; let dbin_file = DbinFile::try_from_read(&mut file_contents)?; - if dbin_file.header.content_type != "ETH" { - return Err(DecoderError::InvalidContentType( - dbin_file.header.content_type, + if dbin_file.content_type() != CONTENT_TYPE { + return Err(DecoderError::ContentTypeInvalid( + dbin_file.content_type().to_string(), )); } - let mut blocks: Vec = vec![]; - - for message in dbin_file.messages { - blocks.push(handle_block(&message, output, headers_dir)?); - } - - Ok(blocks) -} - -/// Decodes a flat file from a buffer containing its contents and optionally decompresses it. -/// -/// Decodes flat files that are already loaded into memory, without direct file system access. -/// It can handle both compressed (if `zstd` decompression is specified) and uncompressed data. Upon successful -/// decoding, it returns a vector of all the blocks contained within the flat file. The actual number of blocks -/// returned depends on the format and content of the flat file—ranging from a single block to multiple blocks. -/// -/// # Arguments -/// -/// * `buf`: A byte slice referencing the in-memory content of the flat file to be decoded. -/// * `decompress`: A boolean indicating whether the input buffer should be decompressed. -/// -pub fn handle_buf(buf: &[u8], decompress: Decompression) -> Result, DecoderError> { - let buf = match decompress { - Decompression::Zstd => zstd::decode_all(buf)?, - Decompression::None => buf.to_vec(), - }; - - let dbin_file = DbinFile::try_from_read(&mut Cursor::new(buf))?; - - let mut blocks: Vec = vec![]; - - for message in dbin_file.messages { - blocks.push(handle_block(&message, None, None)?); - } - Ok(blocks) -} - -/// A struct to hold the receipt and transactions root for a `Block`. -/// This struct is used to compare the receipt and transactions roots of a block -/// with the receipt and transactions roots of another block. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct BlockHeaderRoots { - receipt_root: B256, - transactions_root: B256, -} - -impl TryFrom<&Block> for BlockHeaderRoots { - type Error = DecoderError; - - fn try_from(block: &Block) -> Result { - block.header()?.try_into() - } -} - -impl TryFrom<&BlockHeader> for BlockHeaderRoots { - type Error = DecoderError; - - fn try_from(header: &BlockHeader) -> Result { - let receipt_root: [u8; 32] = header.receipt_root.as_slice().try_into()?; - let transactions_root: [u8; 32] = header.transactions_root.as_slice().try_into()?; - - Ok(Self { - receipt_root: receipt_root.into(), - transactions_root: transactions_root.into(), - }) - } -} - -impl BlockHeaderRoots { - /// Checks if the receipt and transactions roots of a block header match the receipt and transactions roots of another block. - pub fn block_header_matches(&self, block: &Block) -> bool { - let block_header_roots = match block.try_into() { - Ok(block_header_roots) => block_header_roots, - Err(e) => { - error!("Failed to convert block to header roots: {e}"); - return false; + dbin_file + .into_iter() + .map(|message| { + let block = decode_block_from_bytes(&message)?; + if !block_is_verified(&block) { + Err(DecoderError::VerificationFailed { + block_number: block.number, + }) + } else { + Ok(block) } - }; - - self.block_header_roots_match(&block_header_roots) - } - - fn block_header_roots_match(&self, block_header_roots: &BlockHeaderRoots) -> bool { - self == block_header_roots - } + }) + .collect() } -fn handle_block( - message: &Vec, - output: Option<&str>, - headers_dir: Option<&str>, -) -> Result { - let block = decode_block_from_bytes(message)?; - - if let Some(headers_dir) = headers_dir { - let header_file_path = format!("{}/{}.json", headers_dir, block.number); - let header_file = File::open(header_file_path)?; - let header_roots: BlockHeaderRoots = serde_json::from_reader(header_file)?; - - if !header_roots.block_header_matches(&block) { - return Err(DecoderError::MatchRootsFailed { - block_number: block.number, - }); - } - } - +fn block_is_verified(block: &Block) -> bool { if block.number != 0 { if !block.receipt_root_is_verified() { - return Err(DecoderError::ReceiptRoot); + error!( + "Receipt root verification failed for block {}", + block.number + ); + return false; } if !block.transaction_root_is_verified() { - return Err(DecoderError::TransactionRoot); + error!( + "Transaction root verification failed for block {}", + block.number + ); + return false; } } + true +} - if let Some(output) = output { - let file_name = format!("{}/block-{}.json", output, block.number); - let mut out_file = File::create(file_name)?; +/// Read from an extendable set of reader inputs. +/// +/// It can be a BufReader or a StdIn reader with or without compression +/// The BufReader is used when the data is already loaded into memory, +/// assuming that the data is not compressed. +#[derive(Debug)] +pub enum Reader { + Buf(BufReader>>), + StdIn(Compression), +} + +impl Reader { + pub(crate) fn into_reader(self) -> Result, DecoderError> { + match self { + Reader::StdIn(compression) => match compression { + Compression::Zstd => Ok(Box::new(zstd::stream::Decoder::new(std::io::stdin())?)), + Compression::None => Ok(Box::new(BufReader::with_capacity( + (64 * 2) << 20, + std::io::stdin().lock(), + ))), + }, + Reader::Buf(reader) => Ok(Box::new(reader)), + } + } +} - let block_json = serde_json::to_string(&block)?; +impl TryFrom for Box { + type Error = DecoderError; - out_file.write_all(block_json.as_bytes())?; + fn try_from(reader: Reader) -> Result { + reader.into_reader() } - - Ok(block) } -#[derive(Serialize, Deserialize)] -struct HeaderRecordWithNumber { - block_hash: Vec, - block_number: u64, - total_difficulty: Vec, +/// Set the end block for the range of blocks to read, decode, and verify. +/// +/// Enum to handle the end block of the stream +/// It can be the merge block or a specific block number +pub enum EndBlock { + MergeBlock, + Block(u64), +} + +impl EndBlock { + fn block_number(&self) -> u64 { + const LAST_PREMERGE_BLOCK: u64 = 15537393; + match self { + EndBlock::MergeBlock => LAST_PREMERGE_BLOCK, + EndBlock::Block(block_number) => *block_number, + } + } } -impl TryFrom<&Block> for HeaderRecordWithNumber { - type Error = DecoderError; - - fn try_from(block: &Block) -> Result { - let block_header = block.header()?; - - let header_record_with_number = HeaderRecordWithNumber { - block_hash: block.hash.clone(), - total_difficulty: block_header - .total_difficulty - .as_ref() - .ok_or(Self::Error::InvalidTotalDifficulty)? - .bytes - .clone(), - block_number: block.number, - }; - Ok(header_record_with_number) +impl From> for EndBlock { + fn from(value: Option) -> Self { + value.map_or(EndBlock::MergeBlock, EndBlock::Block) } } -/// Decode blocks from a reader and writes them, serialized, to a writer +/// Get an iterator of decoded, verified blocks from a reader. /// -/// data can be piped into this function from stdin via `cargo run stream < ./example0017686312.dbin`. -/// It also has a check for end_block. By default, it stops the stream reading when MERGE_BLOCK -/// is reached. +/// Skips invalid blocks and returns an iterator of verified blocks. /// /// # Arguments /// -/// * `end_block`: For blocks after the merge, Ethereum sync committee should be used. This is why the default block -/// for this param is the LAST_PREMERGE_BLOCK (block 15537393) -/// * `reader`: where bytes are read from -/// * `writer`: where bytes written to -pub async fn stream_blocks( - mut reader: R, - mut writer: W, - end_block: Option, -) -> Result<(), DecoderError> { - const LAST_PREMERGE_BLOCK: usize = 15537393; +/// * `reader`: A [`Reader`] enum that specifies the source of the block data. The reader can be a +/// [`BufReader`] or a `StdIn` reader with or without compression. +/// * `end_block`: Specifies the block number at which to stop streaming. By default, this is set to +/// `LAST_PREMERGE_BLOCK` (block 15537393), which marks the last block before the Ethereum merge. +pub fn stream_blocks( + reader: Reader, + end_block: EndBlock, +) -> Result, DecoderError> { + let mut current_block_number = 0; - let end_block = match end_block { - Some(end_block) => end_block, - None => LAST_PREMERGE_BLOCK, - }; + let mut reader = reader.into_reader()?; + let end_block = end_block.block_number(); - let mut block_number = 0; + let mut blocks = Vec::new(); loop { - match DbinFile::read_message_stream(&mut reader) { + match read_block_from_reader(&mut reader) { Ok(message) => { - let block = decode_block_from_bytes(&message)?; + match decode_block_from_bytes(&message) { + Ok(block) => { + current_block_number = block.number; - block_number = block.number as usize; - - let receipts_check_process = - spawn_check(&block, |b| match b.receipt_root_is_verified() { - true => Ok(()), - false => Err(DecoderError::ReceiptRoot), - }); - - let transactions_check_process = - spawn_check(&block, |b| match b.transaction_root_is_verified() { - true => Ok(()), - false => Err(DecoderError::TransactionRoot), - }); - - let joint_return = join![receipts_check_process, transactions_check_process]; - joint_return.0?; - joint_return.1?; - - let header_record_with_number = HeaderRecordWithNumber::try_from(&block)?; - let header_record_bin = bincode::serialize(&header_record_with_number)?; - - let size = header_record_bin.len() as u32; - writer.write_all(&size.to_be_bytes())?; - writer.write_all(&header_record_bin)?; - writer.flush()?; - } - Err(e) => { - if let DecoderError::Io(ref e) = e { - if e.kind() == std::io::ErrorKind::UnexpectedEof { - if block_number < end_block { - info!("Reached end of file, waiting for more blocks"); - // More blocks to read - continue; + if block_is_verified(&block) { + blocks.push(block); } else { - // All blocks have been read - break; + info!("Block verification failed, skipping block {}", block.number); } } + Err(e) => return Err(e), + }; + } + Err(DecoderError::Io(e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => { + if current_block_number < end_block { + info!("Reached end of file, waiting for more blocks"); + continue; } - - error!("Error reading dbin file: {}", e); break; } + Err(e) => return Err(e), } } - Ok(()) + + Ok(blocks.into_iter()) } -fn decode_block_from_bytes(bytes: &Vec) -> Result { - let block_stream = bstream::v1::Block::decode(bytes.as_slice())?; +/// Decodes a block from a byte slice. +fn decode_block_from_bytes(bytes: &[u8]) -> Result { + let block_stream = bstream::v1::Block::decode(bytes)?; let block = ethereum_v2::Block::decode(block_stream.payload_buffer.as_slice())?; Ok(block) } - -// Define a generic function to spawn a blocking task for a given check. -fn spawn_check(block: &Block, check: F) -> tokio::task::JoinHandle<()> -where - F: FnOnce(&Block) -> Result<(), DecoderError> + Send + 'static, -{ - let block_clone = block.clone(); - tokio::task::spawn_blocking(move || match check(&block_clone) { - Ok(_) => {} - Err(err) => { - error!("{}", err); - } - }) -} - -#[cfg(test)] -mod tests { - use firehose_protos::bstream::v1::Block as BstreamBlock; - use std::io::{BufReader, BufWriter}; - - use super::*; - - const TEST_ASSET_PATH: &str = "../../test-assets"; - - #[test] - fn test_handle_file() { - let path = PathBuf::from(format!("{TEST_ASSET_PATH}/example0017686312.dbin")); - - let result = handle_file(&path, None, None, Decompression::None); - - assert!(result.is_ok()); - } - - #[test] - fn test_handle_file_zstd() { - let path = PathBuf::from(format!("{TEST_ASSET_PATH}/0000000000.dbin.zst")); - - let result = handle_file(&path, None, None, Decompression::Zstd); - - assert!(result.is_ok()); - let blocks: Vec = result.unwrap(); - assert_eq!(blocks[0].number, 0); - } - - #[test] - fn test_check_valid_root_fail() { - let path = PathBuf::from(format!("{TEST_ASSET_PATH}/example0017686312.dbin")); - let mut file = BufReader::new(File::open(path).expect("Failed to open file")); - let dbin_file: DbinFile = - DbinFile::try_from_read(&mut file).expect("Failed to parse dbin file"); - - let message = dbin_file.messages[0].clone(); - - let block_stream = BstreamBlock::decode(message.as_slice()).unwrap(); - let mut block = Block::decode(block_stream.payload_buffer.as_slice()).unwrap(); - - assert!(block.receipt_root_is_verified()); - - // Remove an item from the block to make the receipt root invalid - block.transaction_traces.pop(); - - assert!(!block.receipt_root_is_verified()); - } - - #[test] - fn test_block_stream() { - let mut buffer = Vec::new(); - let cursor: Cursor<&mut Vec> = Cursor::new(&mut buffer); - let inputs = vec![ - format!("{TEST_ASSET_PATH}/example-create-17686085.dbin"), - format!("{TEST_ASSET_PATH}/example0017686312.dbin"), - ]; - { - let mut writer = BufWriter::new(cursor); - for i in inputs { - let mut input = File::open(i).expect("couldn't read input file"); - - std::io::copy(&mut input, &mut writer).expect("couldn't copy"); - writer.flush().expect("failed to flush output"); - } - } - let mut cursor = Cursor::new(&buffer); - cursor.set_position(0); - - let reader = BufReader::new(cursor); - let mut in_buffer = Vec::new(); - let writer = BufWriter::new(Cursor::new(&mut in_buffer)); - - matches!( - tokio_test::block_on(stream_blocks(reader, writer, None)), - Ok(()) - ); - } - - #[test] - fn test_handle_buff() { - let path = PathBuf::from(format!("{TEST_ASSET_PATH}/example0017686312.dbin")); - let file = BufReader::new(File::open(path).expect("Failed to open file")); - let mut reader = BufReader::new(file); - - let mut buffer = Vec::new(); - - reader - .read_to_end(&mut buffer) - .expect("Failed to read file"); - - let result = handle_buf(&buffer, Decompression::None); - if let Err(e) = result { - panic!("handle_buf failed: {}", e); - } - assert!(result.is_ok(), "handle_buf should complete successfully"); - } - - #[test] - fn test_handle_buff_decompress() { - let path = PathBuf::from(format!("{TEST_ASSET_PATH}/0000000000.dbin.zst")); - let file = BufReader::new(File::open(path).expect("Failed to open file")); - let mut reader = BufReader::new(file); - - let mut buffer = Vec::new(); - - reader - .read_to_end(&mut buffer) - .expect("Failed to read file"); - - let result = handle_buf(&buffer, Decompression::Zstd); - assert!( - result.is_ok(), - "handle_buf should complete successfully with decompression" - ); - } -} diff --git a/crates/flat-files-decoder/src/decompression.rs b/crates/flat-files-decoder/src/decompression.rs deleted file mode 100644 index 830b9eca..00000000 --- a/crates/flat-files-decoder/src/decompression.rs +++ /dev/null @@ -1,25 +0,0 @@ -#[derive(Clone, Copy, Debug, Default)] -pub enum Decompression { - Zstd, - #[default] - None, -} - -impl From<&str> for Decompression { - fn from(value: &str) -> Self { - match value { - "true" | "1" => Decompression::Zstd, - "false" | "0" => Decompression::None, - _ => Decompression::None, - } - } -} - -impl From for Decompression { - fn from(value: bool) -> Self { - match value { - true => Decompression::Zstd, - false => Decompression::None, - } - } -} diff --git a/crates/flat-files-decoder/src/error.rs b/crates/flat-files-decoder/src/error.rs index 04a1d054..ef4e4d20 100644 --- a/crates/flat-files-decoder/src/error.rs +++ b/crates/flat-files-decoder/src/error.rs @@ -1,43 +1,44 @@ use thiserror::Error; +/// Get custom error variants for issues with reading, decoding, and verifying flat files. #[derive(Debug, Error)] pub enum DecoderError { #[error("Bin code error: {0}")] Bincode(#[from] bincode::Error), - #[error("dbin files with different versions")] - DifferingDbinVersions, + #[error("Invalid flat file bytes")] + BytesInvalid, + #[error("Invalid flat file content type: {0}")] + ContentTypeInvalid(String), + #[error("Protos error: {0}")] + FirehoseProtosError(#[from] firehose_protos::error::ProtosError), + #[error("Unsupported format: {0:?}")] + FormatUnsupported(Option), + #[error("Invalid header")] + HeaderInvalid, #[error("I/O error: {0}")] Io(#[from] std::io::Error), - #[error("Invalid content type: {0}")] - InvalidContentType(String), - #[error("Incorrect dbin bytes")] - InvalidDbinBytes, - #[error("Invalid header")] - InvalidHeader, - #[error("Invalid input")] - InvalidInput, - #[error("Invalid block header total difficulty")] - InvalidTotalDifficulty, - #[error("Invalid UTF8: {0}")] - InvalidUtf8(#[from] std::string::FromUtf8Error), - #[error("Join error: {0}")] - JoinError(#[from] tokio::task::JoinError), - #[error("Block header JSON Error: {0}")] - JsonError(#[from] serde_json::Error), + #[error("{0}")] + Json(#[from] serde_json::Error), + #[error("Magic bytes at start of file are invalid")] + MagicBytesInvalid, #[error("Failed to match roots for block number {block_number}")] MatchRootsFailed { block_number: u64 }, #[error("Protobuf decode error: {0}")] ProtobufDecode(#[from] prost::DecodeError), - #[error("Protos error: {0}")] - ProtosError(#[from] firehose_protos::error::ProtosError), #[error("Invalid Receipt Root")] - ReceiptRoot, - #[error("Start of new dbin file")] - StartOfNewDbinFile, + ReceiptRootInvalid, + #[error("Invalid block header total difficulty")] + TotalDifficultyInvalid, #[error("Invalid Transaction Root")] - TransactionRoot, + TransactionRootInvalid, #[error("TryFromSliceError: {0}")] - TryFromSliceError(#[from] std::array::TryFromSliceError), - #[error("Unsupported version")] - UnsupportedDbinVersion, + TryFromSlice(#[from] std::array::TryFromSliceError), + #[error("{0}")] + Utf8(#[from] std::string::FromUtf8Error), + #[error("Block verification failed {block_number}")] + VerificationFailed { block_number: u64 }, + #[error("Flat files with different versions")] + VersionConflict, + #[error("Unsupported flat file version")] + VersionUnsupported, } diff --git a/crates/flat-files-decoder/src/lib.rs b/crates/flat-files-decoder/src/lib.rs index b9503cca..4a2c7d22 100644 --- a/crates/flat-files-decoder/src/lib.rs +++ b/crates/flat-files-decoder/src/lib.rs @@ -1,12 +1,11 @@ -//! # Flat Files Decoder for Firehose +//! # Flat Files Decoder //! -//! This crate offers utility functions for reading and verifying flat files stored on disk. -//! The verifier checks computed receipts and transaction roots against those specified in the -//! block header. Additionally, it can optionally verify the block headers against a directory -//! of JSON-formatted block headers. +//! Read, decode, and verify flat files containing Ethereum blocks. -pub mod cli; -pub mod dbin; -pub mod decoder; -pub mod decompression; -pub mod error; +mod dbin; +mod decoder; +mod error; + +pub use dbin::*; +pub use decoder::*; +pub use error::*; diff --git a/crates/flat-files-decoder/src/main.rs b/crates/flat-files-decoder/src/main.rs deleted file mode 100644 index c627a796..00000000 --- a/crates/flat-files-decoder/src/main.rs +++ /dev/null @@ -1,28 +0,0 @@ -use flat_files_decoder::decoder::run; -use std::process::ExitCode; -use tracing::{error, level_filters::LevelFilter, subscriber::set_global_default}; -use tracing_subscriber::{EnvFilter, FmtSubscriber}; - -#[tokio::main] -async fn main() -> ExitCode { - init_tracing(); - if let Err(e) = run().await { - error!("Decoder error: {e}"); - return ExitCode::from(1); - } - ExitCode::SUCCESS -} - -fn init_tracing() { - let filter = EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(); - let subscriber_builder: tracing_subscriber::fmt::SubscriberBuilder< - tracing_subscriber::fmt::format::DefaultFields, - tracing_subscriber::fmt::format::Format, - EnvFilter, - > = FmtSubscriber::builder().with_env_filter(filter); - set_global_default(subscriber_builder.with_ansi(true).pretty().finish()).expect( - "Failed to set up the global default subscriber for logging. Please check if the RUST_LOG environment variable is set correctly.", - ); -} diff --git a/crates/flat-files-decoder/tests/decode.rs b/crates/flat-files-decoder/tests/decode.rs index 234b4c8a..9c7c0244 100644 --- a/crates/flat-files-decoder/tests/decode.rs +++ b/crates/flat-files-decoder/tests/decode.rs @@ -1,25 +1,50 @@ -use flat_files_decoder::{decoder::decode_flat_files, decompression::Decompression}; +use firehose_protos::{bstream::v1::Block as BstreamBlock, ethereum_v2::Block}; +use flat_files_decoder::{ + read_blocks_from_reader, stream_blocks, Compression, DbinFile, EndBlock, Reader, +}; +use prost::Message; +use std::{ + fs::File, + io::{BufReader, BufWriter, Cursor, Write}, + path::PathBuf, +}; const BLOCK_NUMBER: usize = 0; const TEST_ASSET_PATH: &str = "../../test-assets"; +fn create_test_reader(path: &str) -> BufReader { + BufReader::new(File::open(path).unwrap()) +} + +#[test] +fn test_dbin_try_from_read() { + let mut reader = + BufReader::new(File::open(format!("{TEST_ASSET_PATH}/example0017686312.dbin")).unwrap()); + + let dbin_file = DbinFile::try_from_read(&mut reader).unwrap(); + + insta::assert_debug_snapshot!(dbin_file.content_type(), @r###""ETH""###); +} + #[test] fn test_decode_decompressed() { - let file_name = format!("{TEST_ASSET_PATH}/{:010}.dbin", BLOCK_NUMBER); - let blocks = decode_flat_files(file_name, None, None, Decompression::None).unwrap(); + let file = format!("{TEST_ASSET_PATH}/{:010}.dbin", BLOCK_NUMBER); + let blocks = + read_blocks_from_reader(create_test_reader(file.as_str()), Compression::None).unwrap(); assert_eq!(blocks.len(), 100); } #[test] fn test_decode_compressed() { - let file_name = format!("{TEST_ASSET_PATH}/{:010}.dbin.zst", BLOCK_NUMBER); - let blocks_compressed = decode_flat_files(file_name, None, None, Decompression::Zstd).unwrap(); + let file = format!("{TEST_ASSET_PATH}/{:010}.dbin.zst", BLOCK_NUMBER); + let blocks_compressed = + read_blocks_from_reader(create_test_reader(file.as_str()), Compression::Zstd).unwrap(); assert_eq!(blocks_compressed.len(), 100); - let file_name = format!("{TEST_ASSET_PATH}/{:010}.dbin", BLOCK_NUMBER); + let file = format!("{TEST_ASSET_PATH}/{:010}.dbin", BLOCK_NUMBER); let blocks_decompressed = - decode_flat_files(file_name, None, None, Decompression::None).unwrap(); + read_blocks_from_reader(create_test_reader(file.as_str()), Compression::None).unwrap(); assert_eq!(blocks_compressed.len(), blocks_decompressed.len()); for (b1, b2) in blocks_compressed.into_iter().zip(blocks_decompressed) { assert_eq!(b1.hash, b2.hash); @@ -34,3 +59,91 @@ fn test_decode_compressed() { assert_eq!(b1.system_calls, b2.system_calls); } } + +#[test] +fn test_handle_file() { + let file = format!("{TEST_ASSET_PATH}/example0017686312.dbin"); + let result = read_blocks_from_reader(create_test_reader(file.as_str()), Compression::None); + assert!(result.is_ok()); +} + +#[test] +fn test_handle_file_zstd() { + let file = format!("{TEST_ASSET_PATH}/0000000000.dbin.zst"); + let result = read_blocks_from_reader(create_test_reader(file.as_str()), Compression::Zstd); + assert!(result.is_ok()); + let blocks: Vec = result.unwrap(); + assert_eq!(blocks[0].number, 0); +} + +#[test] +fn test_check_valid_root_fail() { + let path = PathBuf::from(format!("{TEST_ASSET_PATH}/example0017686312.dbin")); + let mut file = BufReader::new(File::open(path).expect("Failed to open file")); + let dbin_file: DbinFile = + DbinFile::try_from_read(&mut file).expect("Failed to parse dbin file"); + + let message = dbin_file.into_iter().next().unwrap(); + + let block_stream = BstreamBlock::decode(message.as_slice()).unwrap(); + let mut block = Block::decode(block_stream.payload_buffer.as_slice()).unwrap(); + + assert!(block.receipt_root_is_verified()); + + // Remove an item from the block to make the receipt root invalid + block.transaction_traces.pop(); + + assert!(!block.receipt_root_is_verified()); +} + +#[test] +fn test_stream_blocks() { + let mut buffer = Vec::new(); + let cursor: Cursor<&mut Vec> = Cursor::new(&mut buffer); + let inputs = vec![ + format!("{TEST_ASSET_PATH}/example-create-17686085.dbin"), + format!("{TEST_ASSET_PATH}/example0017686312.dbin"), + ]; + { + let mut writer = BufWriter::new(cursor); + for i in inputs { + let mut input = File::open(i).expect("couldn't read input file"); + + std::io::copy(&mut input, &mut writer).expect("couldn't copy"); + writer.flush().expect("failed to flush output"); + } + } + let mut cursor = Cursor::new(buffer); + cursor.set_position(0); + + let reader = BufReader::new(cursor); + + let blocks: Vec = stream_blocks(Reader::Buf(reader), EndBlock::MergeBlock) + .unwrap() + .collect(); + + assert_eq!(blocks.len(), 2); + assert_eq!(blocks[0].number, 17686164); + assert_eq!(blocks[1].number, 17686312); +} + +#[test] +fn test_handle_reader() { + let path = PathBuf::from(format!("{TEST_ASSET_PATH}/example0017686312.dbin")); + let file = BufReader::new(File::open(path).expect("Failed to open file")); + let result = read_blocks_from_reader(file, false.into()); + if let Err(e) = result { + panic!("handle_buf failed: {}", e); + } + assert!(result.is_ok(), "handle_buf should complete successfully"); +} + +#[test] +fn test_handle_reader_compressed() { + let file = format!("{TEST_ASSET_PATH}/0000000000.dbin.zst"); + let result = read_blocks_from_reader(create_test_reader(file.as_str()), Compression::Zstd); + assert!( + result.is_ok(), + "handle_buf should complete successfully with decompression" + ); +} diff --git a/crates/flat-files-decoder/tests/transactions.rs b/crates/flat-files-decoder/tests/transactions.rs deleted file mode 100644 index 5f661dbb..00000000 --- a/crates/flat-files-decoder/tests/transactions.rs +++ /dev/null @@ -1,151 +0,0 @@ -use flat_files_decoder::dbin::DbinFile; - -use alloy_primitives::{Address, Bytes, Parity, TxHash, TxKind, U256}; -use firehose_protos::ethereum_v2::Block; -use firehose_protos::{bstream::v1::Block as BstreamBlock, ethereum_v2::transaction_trace::Type}; -use prost::Message; -use reth_primitives::TransactionSigned; -use reth_primitives::TxType; -use std::{fs::File, io::BufReader, str::FromStr}; - -const TEST_ASSET_PATH: &str = "../../test-assets"; - -#[test] -fn example_file_first_tx() { - let mut input_file = - BufReader::new(File::open(format!("{TEST_ASSET_PATH}/example0017686312.dbin")).unwrap()); - - let dbin_file = DbinFile::try_from_read(&mut input_file).unwrap(); - - let message = dbin_file.messages.first().unwrap(); - - let message = BstreamBlock::decode(message.as_slice()).unwrap(); - - let block = Block::decode(message.payload_buffer.as_slice()).unwrap(); - - let trace = block.transaction_traces.first().unwrap(); - - let transaction = TransactionSigned::try_from(trace).unwrap(); - - let tx_details = transaction.transaction; - - assert_eq!(tx_details.value(), U256::from(0)); - assert_eq!(tx_details.nonce(), 3807); - - assert_eq!(tx_details.max_fee_per_gas(), 141_363_047_052); - assert_eq!( - tx_details.max_priority_fee_per_gas().unwrap(), - 2_500_000_000 - ); - - assert_eq!(tx_details.gas_limit(), 149_194); - - assert_eq!( - tx_details.to().unwrap(), - Address::from_str("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D").unwrap() - ); - - assert_eq!(*tx_details.input(), Bytes::from_str("0x38ed1739000000000000000000000000000000000000000000000000482a1c73000800000000000000000000000000000000000000000009c14e785bf4910843948926c200000000000000000000000000000000000000000000000000000000000000a00000000000000000000000006b4b968dcecfd3d197ce04dc8925f919308153660000000000000000000000000000000000000000000000000000000064b040870000000000000000000000000000000000000000000000000000000000000002000000000000000000000000c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2000000000000000000000000370a366f402e2e41cdbbe54ecec12aae0cce1955").unwrap()); - - assert_eq!(tx_details.tx_type(), TxType::Eip1559); - assert_eq!(tx_details.chain_id(), Some(1)); - - assert_eq!( - tx_details.kind(), - TxKind::Call(Address::from_str("0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D").unwrap()) - ); - - let signature = transaction.signature; - - assert_eq!(signature.v(), Parity::Parity(false)); - assert_eq!( - signature.r(), - U256::from_str("0x0c8ee5280894c443ad128321d3f682c257afef878c5be9c18028b9570414213e") - .unwrap() - ); - assert_eq!( - signature.s(), - U256::from_str("0x0318b26186566acbe046e9d9caaa02444f730f4e9023c835530e622e357f3fdd") - .unwrap() - ); - - assert_eq!( - transaction.hash, - TxHash::from_str("0x5d8438a6c6336b90ca42a73c4e4ea8985fdfc3e2526af38592894353fd9d0d39") - .unwrap() - ) -} - -#[test] -fn legacy_tx() { - let mut input_file = - BufReader::new(File::open(format!("{TEST_ASSET_PATH}/example0017686312.dbin")).unwrap()); - - let dbin_file = DbinFile::try_from_read(&mut input_file).unwrap(); - - let message = dbin_file.messages.first().unwrap(); - - let message = BstreamBlock::decode(message.as_slice()).unwrap(); - - let block = Block::decode(message.payload_buffer.as_slice()).unwrap(); - - let trace = block - .transaction_traces - .iter() - .find(|t| Type::try_from(t.r#type).unwrap() == Type::TrxTypeLegacy) - .unwrap(); - - let transaction = TransactionSigned::try_from(trace).unwrap(); - - let signature = transaction.signature; - - assert_eq!(transaction.transaction.tx_type(), TxType::Legacy); - - assert_eq!(transaction.transaction.chain_id(), Some(1)); - - assert_eq!(signature.v(), Parity::Parity(true)); - assert_eq!( - signature.r(), - U256::from_str("0x44c2b52e2e291f1c13f572ff786039d4520955b640eae90d3c3d9a2117b0638b") - .unwrap() - ); - assert_eq!( - signature.s(), - U256::from_str("0x2a15dc9fd6c495a4a65015c3c6e41f480626741e78008091415b26410e209902") - .unwrap() - ); - - assert_eq!( - transaction.hash, - TxHash::from_str("0xa074bc87b8bb4120b77c5991f9d9fe2e1df45c58d891aa1aafb0edd5bf914f8f") - .unwrap() - ); -} - -#[test] -fn create_tx() { - let mut input_file = BufReader::new( - File::open(format!("{TEST_ASSET_PATH}/example-create-17686085.dbin")).unwrap(), - ); - - let dbin_file = DbinFile::try_from_read(&mut input_file).unwrap(); - - let message = dbin_file.messages.first().unwrap(); - - let message = BstreamBlock::decode(message.as_slice()).unwrap(); - - let block = Block::decode(message.payload_buffer.as_slice()).unwrap(); - - let trace = block - .transaction_traces - .iter() - .find(|t| t.index == 141) - .unwrap(); - - let transaction = TransactionSigned::try_from(trace).unwrap(); - - let tx_details = transaction.transaction; - - assert_eq!(tx_details.kind(), TxKind::Create); - assert_eq!(transaction.hash.as_slice(), trace.hash.as_slice()); -} diff --git a/crates/flat-head/src/era_verifier.rs b/crates/flat-head/src/era_verifier.rs index 3d549e4b..5dd7d60c 100644 --- a/crates/flat-head/src/era_verifier.rs +++ b/crates/flat-head/src/era_verifier.rs @@ -1,14 +1,13 @@ -use flat_files_decoder::decompression::Decompression; -use futures::stream::{FuturesOrdered, StreamExt}; -use tokio::task; - use firehose_protos::ethereum_v2::Block; +use flat_files_decoder::Compression; +use futures::stream::{FuturesOrdered, StreamExt}; use header_accumulator::{EraValidator, ExtHeaderRecord}; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, task}; use tree_hash::Hash256; use trin_validation::accumulator::PreMergeAccumulator; use crate::store::{self, Store}; + pub const MAX_EPOCH_SIZE: usize = 8192; pub const FINAL_EPOCH: usize = 1896; pub const MERGE_BLOCK: usize = 15537394; @@ -21,13 +20,13 @@ pub async fn verify_eras( compatible: Option, start_epoch: usize, end_epoch: Option, - decompress: Decompression, + compression: Compression, ) -> Result, anyhow::Error> { let mut validated_epochs = Vec::new(); let (tx, mut rx) = mpsc::channel(5); let blocks_store: store::Store = - store::new(store_url, decompress, compatible).expect("failed to create blocks store"); + store::new(store_url, compression, compatible).expect("failed to create blocks store"); for epoch in start_epoch..=end_epoch.unwrap_or(start_epoch + 1) { let tx = tx.clone(); @@ -35,7 +34,7 @@ pub async fn verify_eras( let store = blocks_store.clone(); task::spawn(async move { - match get_blocks_from_store(epoch, &store, decompress).await { + match get_blocks_from_store(epoch, &store, compression).await { Ok(blocks) => { let (successful_headers, _): (Vec<_>, Vec<_>) = blocks .iter() @@ -76,7 +75,7 @@ pub async fn verify_eras( async fn get_blocks_from_store( epoch: usize, store: &Store, - decompress: Decompression, + decompress: Compression, ) -> Result, anyhow::Error> { let start_100_block = epoch * MAX_EPOCH_SIZE; let end_100_block = (epoch + 1) * MAX_EPOCH_SIZE; @@ -96,7 +95,7 @@ async fn extract_100s_blocks( store: &Store, start_block: usize, end_block: usize, - decompress: Decompression, + decompress: Compression, ) -> Result, anyhow::Error> { // Flat files are stored in 100 block files // So we need to find the 100 block file that contains the start block and the 100 block file that contains the end block @@ -104,8 +103,8 @@ async fn extract_100s_blocks( let end_100_block = (((end_block as f32) / 100.0).ceil() as usize) * 100; let zst_extension = match decompress { - Decompression::Zstd => ".zst", - Decompression::None => "", + Compression::Zstd => ".zst", + Compression::None => "", }; let mut futs = FuturesOrdered::new(); diff --git a/crates/flat-head/src/main.rs b/crates/flat-head/src/main.rs index 2fe26dcc..0b37a07f 100644 --- a/crates/flat-head/src/main.rs +++ b/crates/flat-head/src/main.rs @@ -1,8 +1,7 @@ use std::env; use clap::{Parser, Subcommand}; - -use flat_files_decoder::decompression::Decompression; +use flat_files_decoder::Compression; use flat_head::era_verifier::verify_eras; use trin_validation::accumulator::PreMergeAccumulator; @@ -39,7 +38,7 @@ enum Commands { #[clap(short = 'c', long, default_value = "true")] // Where to decompress files from zstd or not. - decompress: Decompression, + decompress: Compression, #[clap(short = 'p', long)] // indicates if the store_url is compatible with some API. E.g., if `--compatible s3` is used, diff --git a/crates/flat-head/src/s3.rs b/crates/flat-head/src/s3.rs index 9ecaac1d..539f87ee 100644 --- a/crates/flat-head/src/s3.rs +++ b/crates/flat-head/src/s3.rs @@ -3,7 +3,7 @@ use header_accumulator::{Epoch, EraValidator, ExtHeaderRecord, MAX_EPOCH_SIZE}; use std::env; use trin_validation::accumulator::PreMergeAccumulator; -use flat_files_decoder::{decoder::handle_buf, decompression::Decompression}; +use flat_files_decoder::{read_blocks_from_reader, Compression}; use object_store::{aws::AmazonS3Builder, path::Path, ObjectStore}; @@ -58,7 +58,7 @@ pub async fn s3_fetch( let bytes = result.bytes().await.unwrap(); // Use `as_ref` to get a &[u8] from `bytes` and pass it to `handle_buf` - match handle_buf(bytes.as_ref(), Decompression::None) { + match read_blocks_from_reader(bytes.as_ref(), Compression::None) { Ok(blocks) => { let (successful_headers, _): (Vec<_>, Vec<_>) = blocks .iter() diff --git a/crates/flat-head/src/store.rs b/crates/flat-head/src/store.rs index f5fe46cf..13b61805 100644 --- a/crates/flat-head/src/store.rs +++ b/crates/flat-head/src/store.rs @@ -1,6 +1,6 @@ use anyhow::Context; use bytes::Bytes; -use flat_files_decoder::{decoder::handle_buf, decompression::Decompression}; +use flat_files_decoder::{read_blocks_from_reader, Compression}; use object_store::{ aws::AmazonS3Builder, gcp::GoogleCloudStorageBuilder, http::HttpBuilder, local::LocalFileSystem, path::Path, ClientOptions, ObjectStore, @@ -13,7 +13,7 @@ use firehose_protos::ethereum_v2::Block; pub fn new>( store_url: S, - decompress: Decompression, + decompress: Compression, compatible: Option, ) -> Result { let store_url = store_url.as_ref(); @@ -123,7 +123,7 @@ pub fn new>( pub struct Store { store: Arc, base: String, - decompress: Decompression, + decompress: Compression, } impl Store { @@ -158,9 +158,7 @@ impl ReadOptions { } } -async fn handle_from_bytes( - bytes: Bytes, - decompress: Decompression, -) -> Result, ReadError> { - handle_buf(bytes.as_ref(), decompress).map_err(|e| ReadError::DecodeError(e.to_string())) +async fn handle_from_bytes(bytes: Bytes, decompress: Compression) -> Result, ReadError> { + read_blocks_from_reader(bytes.as_ref(), decompress) + .map_err(|e| ReadError::DecodeError(e.to_string())) } diff --git a/crates/header-accumulator/tests/era_validator.rs b/crates/header-accumulator/tests/era_validator.rs index f3f58f66..5568899c 100644 --- a/crates/header-accumulator/tests/era_validator.rs +++ b/crates/header-accumulator/tests/era_validator.rs @@ -1,14 +1,21 @@ -use flat_files_decoder::{decoder::decode_flat_files, decompression::Decompression}; +use std::{fs::File, io::BufReader}; + +use flat_files_decoder::{read_blocks_from_reader, Compression}; use header_accumulator::{Epoch, EraValidateError, EraValidator, ExtHeaderRecord}; use tree_hash::Hash256; use trin_validation::accumulator::PreMergeAccumulator; +fn create_test_reader(path: &str) -> BufReader { + BufReader::new(File::open(path).unwrap()) +} + #[test] fn test_era_validate() -> Result<(), EraValidateError> { let mut headers: Vec = Vec::new(); for number in (0..=8200).step_by(100) { let file_name = format!("tests/ethereum_firehose_first_8200/{:010}.dbin", number); - let blocks = decode_flat_files(file_name, None, None, Decompression::None).unwrap(); + let reader = create_test_reader(&file_name); + let blocks = read_blocks_from_reader(reader, Compression::None).unwrap(); let successful_headers = blocks .iter() .cloned() diff --git a/crates/header-accumulator/tests/inclusion_proof.rs b/crates/header-accumulator/tests/inclusion_proof.rs index 0074159d..069c4c03 100644 --- a/crates/header-accumulator/tests/inclusion_proof.rs +++ b/crates/header-accumulator/tests/inclusion_proof.rs @@ -1,20 +1,26 @@ +use std::{fs::File, io::BufReader}; + use firehose_protos::ethereum_v2::Block; -use flat_files_decoder::{decoder::decode_flat_files, decompression::Decompression}; +use flat_files_decoder::{read_blocks_from_reader, Compression}; use header_accumulator::{ generate_inclusion_proof, verify_inclusion_proof, EraValidateError, ExtHeaderRecord, }; +fn create_test_reader(path: &str) -> BufReader { + BufReader::new(File::open(path).unwrap()) +} + #[test] fn test_inclusion_proof() -> Result<(), EraValidateError> { let mut headers: Vec = Vec::new(); let mut all_blocks: Vec = Vec::new(); // Vector to hold all blocks for flat_file_number in (0..=8200).step_by(100) { - let file_name = format!( + let file = format!( "tests/ethereum_firehose_first_8200/{:010}.dbin", flat_file_number ); - match decode_flat_files(file_name, None, None, Decompression::None) { + match read_blocks_from_reader(create_test_reader(&file), Compression::None) { Ok(blocks) => { headers.extend( blocks diff --git a/crates/header-accumulator/tests/utils.rs b/crates/header-accumulator/tests/utils.rs index f5abb948..f65511ca 100644 --- a/crates/header-accumulator/tests/utils.rs +++ b/crates/header-accumulator/tests/utils.rs @@ -1,13 +1,17 @@ +use std::{fs::File, io::BufReader}; + use ethportal_api::Header; -use flat_files_decoder::{decoder::decode_flat_files, decompression::Decompression}; +use flat_files_decoder::{read_blocks_from_reader, Compression}; + +fn create_test_reader(path: &str) -> BufReader { + BufReader::new(File::open(path).unwrap()) +} #[test] fn test_header_from_block() { - let blocks = decode_flat_files( - "tests/ethereum_firehose_first_8200/0000008200.dbin".to_string(), - None, - None, - Decompression::None, + let blocks = read_blocks_from_reader( + create_test_reader("tests/ethereum_firehose_first_8200/0000008200.dbin"), + Compression::None, ) .unwrap();