From 18dec75834613c017a401ef0c5f16a96e93cf6da Mon Sep 17 00:00:00 2001 From: Michael Zaikin Date: Wed, 10 Jan 2024 15:45:05 +0200 Subject: [PATCH] Listener/spammer for remote benchmark. Genesis certificates for kernel --- .gitignore | 4 +- Cargo.lock | 6 +++ Makefile | 12 ++++- README.md | 41 +++++++++++++++++ crates/exporter/src/proto.rs | 8 +--- crates/pre-block/src/fixture.rs | 4 +- crates/pre-block/src/lib.rs | 57 +++++++++++++++++++++-- crates/pre-block/src/validator.rs | 39 ++++++++++------ kernel/build.rs | 21 +++++++-- kernel/tests/committee.ipynb | 28 ++++++------ sequencer/Cargo.toml | 1 + sequencer/src/da_batcher.rs | 20 --------- sequencer/src/fixture.rs | 70 +++++++++++++++++++++++++++++ sequencer/src/main.rs | 10 ++++- simple-listener/Cargo.toml | 3 ++ simple-listener/src/main.rs | 75 ++++++++++++++++++++++++------- simple-spammer/Cargo.toml | 4 ++ simple-spammer/src/main.rs | 36 ++++++++++----- 18 files changed, 344 insertions(+), 95 deletions(-) create mode 100644 sequencer/src/fixture.rs diff --git a/.gitignore b/.gitignore index 0cd47a7..db9e8f4 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,6 @@ bin/ .tezos-client # Jupyter checkpoints -.ipynb_checkpoints \ No newline at end of file +.ipynb_checkpoints + +dsn.env \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index ba70e27..5db02b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4595,6 +4595,7 @@ dependencies = [ "env_logger", "hex", "log", + "narwhal-config", "narwhal-types", "pre-block", "prost", @@ -4896,8 +4897,10 @@ version = "0.1.0" dependencies = [ "async-trait", "clap", + "env_logger", "fastcrypto", "futures", + "log", "prost", "protobuf-src", "roaring", @@ -4914,8 +4917,11 @@ version = "0.1.0" dependencies = [ "async-trait", "clap", + "env_logger", "fastcrypto", "futures", + "hex", + "log", "prost", "protobuf-src", "rand 0.8.5", diff --git a/Makefile b/Makefile index f502407..7b5d6da 100644 --- a/Makefile +++ b/Makefile @@ -56,6 +56,7 @@ run-operator: $(MAKE) build-operator $(MAKE) image-operator OCTEZ_TAG=$(OCTEZ_TAG) OCTEZ_PROTO=$(OCTEZ_PROTO) docker stop dsn-operator || true + docker volume rm dsn-operator docker run --rm -it \ --name dsn-operator \ --entrypoint=/bin/sh \ @@ -70,6 +71,7 @@ run-sequencer: RUST_LOG=info ./target/debug/sequencer run-dsn: + rm -rf ./db ./target/debug/launcher --id 1 --log-level 2 & ./target/debug/launcher --id 2 --log-level 0 & ./target/debug/launcher --id 3 --log-level 0 & @@ -90,4 +92,12 @@ stop-dsn-min: cd docker/setup/dsn-min-4-1 && docker-compose down -v broadcast: - curl -d '{"data":"deadbeef"}' -H "Content-Type: application/json" -X POST http://localhost:8080/broadcast \ No newline at end of file + curl -d '{"data":"deadbeef"}' -H "Content-Type: application/json" -X POST http://localhost:8080/broadcast + +run-listener: + cargo build --bin simple-listener + RUST_LOG=info ./target/debug/simple-listener --endpoint $(ENDPOINT) --from-id $(FROM_ID) + +run-spammer: + cargo build --bin simple-spammer + RUST_LOG=info ./target/debug/simple-spammer --endpoint $(ENDPOINT) --sleep $(SLEEP) \ No newline at end of file diff --git a/README.md b/README.md index 619755e..b33553e 100644 --- a/README.md +++ b/README.md @@ -12,3 +12,44 @@ The consensus part is based on the Narwhal codebase from [Sui](https://github.co 4. No transaction validation, as we assume a closed network; ![image](https://github.com/baking-bad/sequencer/assets/44951260/7f7604c9-af1b-4c57-8daa-c2d330979b7f) + +## Installation + +Install Rust toolchain: +``` +curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh +``` + +Add Wasm32 target: +``` +rustup target add wasm32-unknown-unknown +``` + +Install kernel build dependencies: +``` +make install +``` + +Install `protobuf` and `clang` system packages: +``` +sudo apt install protobuf-compiler clang +``` + +## How to run + +### Local consensus benchmark + +Check out the [instructions](./benchmark/README.md) + +### Remote consensus benchmark + + +### Local DSN setup + + + +#### Operator + +#### DSN + +#### Sequencer \ No newline at end of file diff --git a/crates/exporter/src/proto.rs b/crates/exporter/src/proto.rs index f5a6d58..6460a6d 100644 --- a/crates/exporter/src/proto.rs +++ b/crates/exporter/src/proto.rs @@ -1,4 +1,3 @@ -use roaring::RoaringBitmap; use std::collections::HashMap; use types::{BatchAPI, CertificateAPI, HeaderAPI}; @@ -58,7 +57,7 @@ impl Certificate { | types::SignatureVerificationState::Unsigned(bytes) => Vec::from(bytes.0), types::SignatureVerificationState::Genesis => Vec::new(), }, - signers: rb_to_bytes(&c.signed_authorities), + signers: c.signed_authorities.iter().map(|x| x as u8).collect(), }, _ => panic!("CertificateV1 is not expected"), } @@ -113,8 +112,3 @@ impl Header { } } -fn rb_to_bytes(rb: &RoaringBitmap) -> Vec { - let mut bytes = Vec::with_capacity(rb.serialized_size()); - rb.serialize_into(&mut bytes).unwrap(); - bytes -} diff --git a/crates/pre-block/src/fixture.rs b/crates/pre-block/src/fixture.rs index ad3a4c1..a246e6f 100644 --- a/crates/pre-block/src/fixture.rs +++ b/crates/pre-block/src/fixture.rs @@ -54,8 +54,8 @@ impl Default for NarwhalFixture { #[derive(Debug, Default)] pub struct SimpleStore { - latest_index: Option, - certificate_indexes: HashMap, + pub latest_index: Option, + pub certificate_indexes: HashMap, } impl PreBlockStore for SimpleStore { diff --git a/crates/pre-block/src/lib.rs b/crates/pre-block/src/lib.rs index a84235b..c010a97 100644 --- a/crates/pre-block/src/lib.rs +++ b/crates/pre-block/src/lib.rs @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: MIT -use std::collections::BTreeSet; +use std::collections::{BTreeSet, BTreeMap}; use digest::Blake2b256; use serde::{Deserialize, Serialize}; @@ -77,7 +77,7 @@ impl PreBlock { // NOTE that index is not enforced by any signature, so technically one can craft a // pre-block with a mismatched (sub dag) index. // The validation would fail either way, because of the parents check. - anyhow::bail!("Non-sequential index"); + anyhow::bail!("Non-sequential index: expected {}", self.index + 1); } // TODO: check that leader is actually leader — or is it implied by consensus? @@ -85,12 +85,17 @@ impl PreBlock { let digests: BTreeSet = self.certificates.iter().map(|cert| cert.digest()).collect(); + let mut missing: BTreeMap = BTreeMap::new(); - validate_certificate_chain(&self.leader, self.index, store, &digests)?; + validate_certificate_chain(&self.leader, config, self.index, store, &digests, &mut missing)?; for (idx, cert) in self.certificates.iter().enumerate() { - validate_certificate_chain(cert, self.index, store, &digests)?; + validate_certificate_chain(cert, config, self.index, store, &digests, &mut missing)?; validate_certificate_batches(cert, self.batches.get(idx).unwrap())?; + + if cert.header.round == 165 { + println!("{:?}", cert.header); + } } Ok(()) @@ -119,6 +124,18 @@ impl DsnConfig { pub fn quorum_threshold(&self) -> usize { self.authorities.len() * 2 / 3 + 1 } + + pub fn genesis(&self) -> Vec { + self.authorities + .iter() + .enumerate() + .map(|(i, _)| CertificateHeader { + epoch: self.epoch, + author: i as u16, + ..Default::default() + }.digest()) + .collect() + } } pub trait PreBlockStore { @@ -127,3 +144,35 @@ pub trait PreBlockStore { fn get_latest_index(&self) -> Option; fn set_latest_index(&mut self, index: u64); } + +#[cfg(test)] +mod tests { + use narwhal_crypto::traits::ToFromBytes; + use narwhal_test_utils::CommitteeFixture; + use narwhal_types::CertificateV2; + + use crate::DsnConfig; + + #[test] + fn test_genesis_certificate_digests() { + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + + let genesis_digests: Vec<[u8; 32]> = CertificateV2::genesis(&committee, true) + .iter() + .map(|cert| cert.header.digest().0) + .collect(); + + let config = DsnConfig::new( + 0, + fixture + .authorities() + .map(|auth| auth.public_key().as_bytes().to_vec()) + .collect(), + ); + + let digests = config.genesis(); + + pretty_assertions::assert_eq!(genesis_digests, digests); + } +} diff --git a/crates/pre-block/src/validator.rs b/crates/pre-block/src/validator.rs index fdd22ed..1cb0e28 100644 --- a/crates/pre-block/src/validator.rs +++ b/crates/pre-block/src/validator.rs @@ -2,7 +2,7 @@ // // SPDX-License-Identifier: MIT -use std::collections::BTreeSet; +use std::collections::{BTreeSet, BTreeMap}; use crate::{ bls_min_sig::aggregate_verify, digest::Blake2b256, Batch, Certificate, Digest, DsnConfig, @@ -26,7 +26,7 @@ pub fn validate_certificate_signature( } if cert.signers.len() < config.quorum_threshold() { - anyhow::bail!("Quorum is not met"); + anyhow::bail!("Quorum is not met (round #{})", cert.header.round); } let digest = cert.digest(); @@ -41,9 +41,11 @@ pub fn validate_certificate_signature( pub fn validate_certificate_chain( cert: &Certificate, + config: &DsnConfig, index: u64, store: &impl PreBlockStore, neighbors: &BTreeSet, + missing: &mut BTreeMap, ) -> anyhow::Result<()> { // We need to ensure the sub dag is: // 1) Not overlapping with the previous one @@ -53,23 +55,31 @@ pub fn validate_certificate_chain( // that every parent certificate is either: // 1) From this sub dag // 2) From a known sub dag (previous one) + // 3) Missing, but is not referenced by the majority for parent in cert.header.parents.iter() { if neighbors.contains(parent) { continue; } match store.get_certificate_index(parent) { - Some(prev_index) if prev_index + 1 != index => { - anyhow::bail!( - "Parent certificate is not from a preceding sub dag {}", - hex::encode(parent) - ) - } + // TODO: this does not hold for second sub-DAG (references genesis cert) + // Some(prev_index) if prev_index + 1 != index => { + // anyhow::bail!( + // "Parent certificate is not from a preceding sub dag {} (round #{})", + // hex::encode(parent), + // cert.header.round, + // ) + // } None => { - anyhow::bail!( - "Parent certificate cannot be not found {}", - hex::encode(parent) - ); + let num_misses = missing.get(parent).unwrap_or(&0usize) + 1; + if num_misses >= config.quorum_threshold() { + anyhow::bail!( + "Parent certificate cannot be not found {} (round #{}, num misses {})", + hex::encode(parent), + cert.header.round, + num_misses, + ); + } } _ => (), } @@ -85,9 +95,10 @@ pub fn validate_certificate_batches(cert: &Certificate, batches: &[Batch]) -> an let digest = batch.digest(); if !digests.contains(&digest) { anyhow::bail!( - "Invalid batch content (digest mismatch), idx = {}, digest = {}", + "Invalid batch content (digest mismatch), idx = {}, digest = {}, round = {}", i, - hex::encode(&digest) + hex::encode(&digest), + cert.header.round, ); } } diff --git a/kernel/build.rs b/kernel/build.rs index b77e288..53fc295 100644 --- a/kernel/build.rs +++ b/kernel/build.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: MIT use narwhal_config::{Committee, Import}; -use pre_block::{fixture::NarwhalFixture, PublicKey}; +use pre_block::{fixture::NarwhalFixture, PublicKey, DsnConfig}; use serde::{Deserialize, Serialize}; use std::path::PathBuf; @@ -48,9 +48,9 @@ fn real_setup() -> std::result::Result> .authorities() .map(|auth| auth.protocol_key_bytes().0.to_vec()) .collect(); - let value = bcs::to_bytes(&authorities)?; + let authorities_value = bcs::to_bytes(&authorities)?; - let setup = KernelSetup { + let mut setup = KernelSetup { instructions: vec![ // Instruction { // set: Set { @@ -66,6 +66,21 @@ fn real_setup() -> std::result::Result> }, ], }; + + let config = DsnConfig { + epoch: 0, + authorities + }; + + for digest in config.genesis() { + setup.instructions.push(Instruction { + set: Set { + value: hex::encode(&(0u64.to_be_bytes())), + to: format!("/certificates/{}", hex::encode(&digest)), + }, + }); + } + Ok(setup) } diff --git a/kernel/tests/committee.ipynb b/kernel/tests/committee.ipynb index 257d4ba..a10c81e 100644 --- a/kernel/tests/committee.ipynb +++ b/kernel/tests/committee.ipynb @@ -130,25 +130,25 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 32, "id": "abcd7d4d", "metadata": {}, "outputs": [], "source": [ - "ROLLUP_ADDRESS = 'sr1MYBhyxFuxVxFWSdWMVSgVG8CHW2AWnbUn'\n", + "ROLLUP_ADDRESS = 'sr1KX68s61pKtbwwmnucAEeE1YjFdNCRLc2p' # PASTE YOUR ROLLUP ADDRESS\n", "DSN_CONFIG = '000000000000000007608217efb0d19cb9960f856319d25c3d57c6829661533f489c7db7e33d15a0195012085a2f3874b91e43ed910fb2975d6918db3d63193bf39af40c69ce5759cad9794fa64b55f7c9d092498aaf4f81ba7a4b865e8f51cd69a69669c8cd8e9304ca608f613bf3261ebe81983a54cbbb75a12faf3cbe41f68a7b6586ad5000d4a2228cb867528cc15fed6573af42359a0836bb08aac00debbf0d4074e82301a1cf914e78fdc13d80852b39d88b24c4be9a77fbd73b0f8bcff5e44e64896691e33bc1af60951406b2856fdabbe0e6bdc72c5b7aac777b7d24fc9022b5bc0078beba5b5d8a4e7b54c716d948db02ed6861b88d4e201970415c6e3a5512da895c2ec93fd686ec9f3bf190fe9bff5c8739446b4036d7ec8bcf3ae9937a1e954ab8d7fbea05d960a78869789da72a1e2b7b8b4703bd88b32479f0511533cc9172ba5c297bd45b26bc6239820fafca91504095153aeb8ffa08bf625e77703ab559894076789f224893593e7889cbb5db03d80180aeb7d2e76d540e6255f4a6f03d5ba14aa244354760ac192182731d3655685699102338ce29d07a988c61bd1c64ef18b07ab69254670abfeaf8f6d114cc2994b3cb7a43987110519d019a2fc6a5ad4f6c0afbb4aafaeb46a6cf4dbae2d548aa23036065fa1d2871cd8e7a887756801cc94cf7eb0f5060b6c9baa74c09e784b9570f63652e42b924fb76edfc0d796dc4326b200ebf3bea90c373a68fb73b761bf37a6b8bbc5829016226c8a21767eeb7b625b0390933da52b0792c0209bd736a15625168ea7b75af730ae1cd452d6b86dc0c2529b1384860b8f63f5c29dace07ca215ac250ea4afc14197c5230706b4711b6500b7bc4be11462897d80aa1e67979c5a1a196031edf06e282a03acd4dc09eebb6d4fe07d1950b4566d2fa5ab7cfb79882443068871334d2cd2a664bd5bda6c65bdc6eee01a0'" ] }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 33, "id": "a67718a4", "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "\n", + "\n", "\n", "Properties\n", ".key\t\ttz1grSQDByRpnVs7sPtaprNZRp531ZKz6Jmm\n", @@ -156,24 +156,24 @@ ".block_id\thead\n", "\n", "Payload\n", - "{'branch': 'BLW1HPDPVMyX6tkPWwUsnK7obv1DVzCme85A1hbC2udT1utRTRU',\n", + "{'branch': 'BMB7EUj12DK7e8HxPpP98n9kxBQMmGmaS67scJ4fUayd2XSdBcb',\n", " 'contents': [{'amount': '0',\n", - " 'counter': '1281',\n", + " 'counter': '1293',\n", " 'destination': 'KT1S2uEmcLsTP9tMr5KQkuHFARTyTijL7UEV',\n", - " 'fee': '1100',\n", - " 'gas_limit': '1084',\n", + " 'fee': '1108',\n", + " 'gas_limit': '1085',\n", " 'kind': 'transaction',\n", " 'parameters': {'entrypoint': 'default',\n", - " 'value': {'args': [{'string': 'sr1QMThifta6GxVB6roeMcwMKyJfwoXjx2Jx'},\n", - " {'bytes': '07608217efb0d19cb9960f856319d25c3d57c6829661533f489c7db7e33d15a0195012085a2f3874b91e43ed910fb2975d6918db3d63193bf39af40c69ce5759cad9794fa64b55f7c9d092498aaf4f81ba7a4b865e8f51cd69a69669c8cd8e9304ca608f613bf3261ebe81983a54cbbb75a12faf3cbe41f68a7b6586ad5000d4a2228cb867528cc15fed6573af42359a0836bb08aac00debbf0d4074e82301a1cf914e78fdc13d80852b39d88b24c4be9a77fbd73b0f8bcff5e44e64896691e33bc1af60951406b2856fdabbe0e6bdc72c5b7aac777b7d24fc9022b5bc0078beba5b5d8a4e7b54c716d948db02ed6861b88d4e201970415c6e3a5512da895c2ec93fd686ec9f3bf190fe9bff5c8739446b4036d7ec8bcf3ae9937a1e954ab8d7fbea05d960a78869789da72a1e2b7b8b4703bd88b32479f0511533cc9172ba5c297bd45b26bc6239820fafca91504095153aeb8ffa08bf625e77703ab559894076789f224893593e7889cbb5db03d80180aeb7d2e76d540e6255f4a6f03d5ba14aa244354760ac192182731d3655685699102338ce29d07a988c61bd1c64ef18b07ab69254670abfeaf8f6d114cc2994b3cb7a43987110519d019a2fc6a5ad4f6c0afbb4aafaeb46a6cf4dbae2d548aa23036065fa1d2871cd8e7a887756801cc94cf7eb0f5060b6c9baa74c09e784b9570f63652e42b924fb76edfc0d796dc4326b200ebf3bea90c373a68fb73b761bf37a6b8bbc5829016226c8a21767eeb7b625b0390933da52b0792c0209bd736a15625168ea7b75af730ae1cd452d6b86dc0c2529b1384860b8f63f5c29dace07ca215ac250ea4afc14197c5230706b4711b6500b7bc4be11462897d80aa1e67979c5a1a196031edf06e282a03acd4dc09eebb6d4fe07d1950b4566d2fa5ab7cfb79882443068871334d2cd2a664bd5bda6c65bdc6eee01a0'}],\n", + " 'value': {'args': [{'string': 'sr1KX68s61pKtbwwmnucAEeE1YjFdNCRLc2p'},\n", + " {'bytes': '000000000000000007608217efb0d19cb9960f856319d25c3d57c6829661533f489c7db7e33d15a0195012085a2f3874b91e43ed910fb2975d6918db3d63193bf39af40c69ce5759cad9794fa64b55f7c9d092498aaf4f81ba7a4b865e8f51cd69a69669c8cd8e9304ca608f613bf3261ebe81983a54cbbb75a12faf3cbe41f68a7b6586ad5000d4a2228cb867528cc15fed6573af42359a0836bb08aac00debbf0d4074e82301a1cf914e78fdc13d80852b39d88b24c4be9a77fbd73b0f8bcff5e44e64896691e33bc1af60951406b2856fdabbe0e6bdc72c5b7aac777b7d24fc9022b5bc0078beba5b5d8a4e7b54c716d948db02ed6861b88d4e201970415c6e3a5512da895c2ec93fd686ec9f3bf190fe9bff5c8739446b4036d7ec8bcf3ae9937a1e954ab8d7fbea05d960a78869789da72a1e2b7b8b4703bd88b32479f0511533cc9172ba5c297bd45b26bc6239820fafca91504095153aeb8ffa08bf625e77703ab559894076789f224893593e7889cbb5db03d80180aeb7d2e76d540e6255f4a6f03d5ba14aa244354760ac192182731d3655685699102338ce29d07a988c61bd1c64ef18b07ab69254670abfeaf8f6d114cc2994b3cb7a43987110519d019a2fc6a5ad4f6c0afbb4aafaeb46a6cf4dbae2d548aa23036065fa1d2871cd8e7a887756801cc94cf7eb0f5060b6c9baa74c09e784b9570f63652e42b924fb76edfc0d796dc4326b200ebf3bea90c373a68fb73b761bf37a6b8bbc5829016226c8a21767eeb7b625b0390933da52b0792c0209bd736a15625168ea7b75af730ae1cd452d6b86dc0c2529b1384860b8f63f5c29dace07ca215ac250ea4afc14197c5230706b4711b6500b7bc4be11462897d80aa1e67979c5a1a196031edf06e282a03acd4dc09eebb6d4fe07d1950b4566d2fa5ab7cfb79882443068871334d2cd2a664bd5bda6c65bdc6eee01a0'}],\n", " 'prim': 'Pair'}},\n", " 'source': 'tz1grSQDByRpnVs7sPtaprNZRp531ZKz6Jmm',\n", " 'storage_limit': '100'}],\n", " 'protocol': 'PtNairobiyssHuh87hEhfVBGCVrK3WnS8Z2FT4ymB5tAa4r1nQf',\n", - " 'signature': 'sigQmHq5duTrAVBmi57z6PUuSfWmLJ89o3oW4QuwuTZN6msHYbed6hMsUYGBYMKN4moqaxzV4S1NHF59fyiLgPMB4N8FdD1C'}\n", + " 'signature': 'sigfocdLJrKhXPRKshyr7RKjokR2N83c8di7kpsVA4mpVZjwSHEzYyD6JsAbtQvVvHGmzteroix4yhwtUrBgGch36iHsRLZz'}\n", "\n", "Hash\n", - "opA67KNL6pG79dhQWTpKhuj9rnvN6sUzXvMju6zNMNxxGq6FvwG\n", + "onezC2CYE1AT7en3prLVCZy4nHuhiw9paX8HBf4kh2ar62ayXpg\n", ".activate_account()\n", ".autofill()\n", ".ballot()\n", @@ -209,13 +209,13 @@ ".transfer_ticket()" ] }, - "execution_count": 12, + "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "dao.default((ROLLUP_ADDRESS, AUTHORITIES)).send()" + "dao.default((ROLLUP_ADDRESS, DSN_CONFIG)).send()" ] }, { diff --git a/sequencer/Cargo.toml b/sequencer/Cargo.toml index 39416e5..30552ca 100644 --- a/sequencer/Cargo.toml +++ b/sequencer/Cargo.toml @@ -27,6 +27,7 @@ env_logger = "0.10.0" log = "0.4" axum = "0.7.3" +narwhal-config.workspace = true narwhal-types.workspace = true pre-block = { workspace = true, features = ["conversions"] } diff --git a/sequencer/src/da_batcher.rs b/sequencer/src/da_batcher.rs index 8629100..52f0507 100644 --- a/sequencer/src/da_batcher.rs +++ b/sequencer/src/da_batcher.rs @@ -3,7 +3,6 @@ // SPDX-License-Identifier: MIT use log::info; -use pre_block::fixture::NarwhalFixture; use pre_block::PreBlock; use serde::Serialize; use std::{sync::mpsc, time::Duration}; @@ -65,25 +64,6 @@ pub fn batch_encode_to( Ok(()) } -pub async fn generate_pre_blocks( - prev_index: u64, - pre_blocks_tx: mpsc::Sender, -) -> anyhow::Result<()> { - let mut index = prev_index; - let mut fixture = NarwhalFixture::default(); - - loop { - let pre_block = fixture.next_pre_block(1); - if pre_block.index() == index { - info!("[DA fetch] received pre-block #{}", index); - pre_blocks_tx.send(pre_block)?; - index += 1; - - tokio::time::sleep(Duration::from_secs(1)).await; - } - } -} - pub fn is_leader(level: u32, node_id: u8) -> bool { if level % 2 == 0 { (level / 2) % (node_id as u32) == 0 diff --git a/sequencer/src/fixture.rs b/sequencer/src/fixture.rs new file mode 100644 index 0000000..ee25f00 --- /dev/null +++ b/sequencer/src/fixture.rs @@ -0,0 +1,70 @@ +// SPDX-FileCopyrightText: 2023 Baking Bad +// +// SPDX-License-Identifier: MIT + +use narwhal_config::{Committee, Import}; +use pre_block::fixture::{NarwhalFixture, SimpleStore}; +use pre_block::{PreBlock, PublicKey, DsnConfig}; +use std::path::PathBuf; +use std::sync::mpsc; +use std::time::Duration; +use log::info; + +pub async fn generate_pre_blocks( + prev_index: u64, + pre_blocks_tx: mpsc::Sender, +) -> anyhow::Result<()> { + let mut index = prev_index; + let mut fixture = NarwhalFixture::default(); + + loop { + let pre_block = fixture.next_pre_block(1); + if pre_block.index() == index { + info!("[DA fetch] received pre-block #{}", index); + pre_blocks_tx.send(pre_block)?; + index += 1; + + tokio::time::sleep(Duration::from_secs(1)).await; + } + } +} + +pub async fn verify_pre_blocks( + pre_blocks_rx: mpsc::Receiver, +) -> anyhow::Result<()> { + let mut committee_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + committee_path.push("../launcher/defaults/committee.json"); + + let mut committee = Committee::import(committee_path.as_os_str().to_str().unwrap())?; + committee.load(); + + let authorities: Vec = committee + .authorities() + .map(|auth| auth.protocol_key_bytes().0.to_vec()) + .collect(); + + let config = DsnConfig { + epoch: 0, + authorities + }; + + let mut store = SimpleStore { + latest_index: Some(0), + certificate_indexes: config + .genesis() + .into_iter() + .map(|digest| (digest, 0u64)) + .collect() + }; + + loop { + while let Ok(pre_block) = pre_blocks_rx.try_recv() { + info!("[DA verifier] Pending #{}", pre_block.index()); + pre_block.verify(&config, &mut store)?; + pre_block.commit(&mut store); + info!("[DA verifier] OK #{}", pre_block.index()); + } + + tokio::time::sleep(Duration::from_secs(1)).await; + } +} diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index 36fbe53..db77ad3 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -19,12 +19,13 @@ use std::time::Duration; use tokio::signal; use tokio::task::JoinHandle; -use crate::consensus_client::PrimaryClient; +use crate::{consensus_client::PrimaryClient, fixture::verify_pre_blocks}; use crate::da_batcher::publish_pre_blocks; mod consensus_client; mod da_batcher; mod rollup_client; +mod fixture; #[derive(Clone)] struct AppState { @@ -160,7 +161,7 @@ async fn run_da_task( let mut primary_client = PrimaryClient::new(primary_node_url); loop { - let from_id = rollup_client.get_next_index().await?; + let from_id = 1; // rollup_client.get_next_index().await?; let (tx, rx) = mpsc::channel(); info!("[DA task] Starting from index #{}", from_id); @@ -170,6 +171,11 @@ async fn run_da_task( error!("[DA fetch] Failed with: {}", err); } }, + // res = verify_pre_blocks(rx) => { + // if let Err(err) = res { + // error!("[DA verify] Failed with: {}", err); + // } + // }, res = publish_pre_blocks(&rollup_client, &smart_rollup_address, node_id, rx) => { if let Err(err) = res { error!("[DA publish] Failed with: {}", err); diff --git a/simple-listener/Cargo.toml b/simple-listener/Cargo.toml index 6499f6d..ff9ace7 100644 --- a/simple-listener/Cargo.toml +++ b/simple-listener/Cargo.toml @@ -15,6 +15,9 @@ futures.workspace = true fastcrypto.workspace = true roaring.workspace = true +env_logger = "0.10.0" +log = "0.4" + [target.'cfg(not(target_env = "msvc"))'.build-dependencies] protobuf-src.workspace = true diff --git a/simple-listener/src/main.rs b/simple-listener/src/main.rs index 7d25272..dd3a03b 100644 --- a/simple-listener/src/main.rs +++ b/simple-listener/src/main.rs @@ -2,7 +2,8 @@ use clap::Parser; use std::time::Duration; use tokio::time::sleep; use tonic::transport::Channel; - +use log::{error, info}; +use std::time::{SystemTime, UNIX_EPOCH}; mod exporter { tonic::include_proto!("exporter"); } @@ -24,24 +25,25 @@ struct Args { #[tokio::main] async fn main() { + env_logger::init(); let args = Args::parse(); loop { - println!("Connecting to {}...", args.endpoint.clone()); + info!("Connecting to {}...", args.endpoint.clone()); match connect(args.endpoint.clone()).await { Ok(client) => { - println!("Connected. Exporting subdags from #{}...", args.from_id); + info!("Connected. Exporting subdags from #{}...", args.from_id); match export(client, args.from_id).await { Ok(_) => { - println!("Exit"); + info!("Exit"); break; } Err(e) => { - println!("Failed to export: {}", e); + error!("Failed to export: {}", e); } } } Err(e) => { - println!("Failed to connect: {}", e); + error!("Failed to connect: {}", e); } } sleep(Duration::from_secs(1)).await; @@ -59,23 +61,64 @@ async fn export( let mut stream = client.export(ExportRequest { from_id }).await?.into_inner(); while let Some(subdag) = stream.message().await? { - println!( - "Received subdag #{} with {} txs", - subdag.id, - total_txs(&subdag) - ); + let stats = stats(&subdag); + if stats.num_txs > 0 { + info!( + "Received subdag #{} (num txs {}, payload size {}, avg latency {} ms, cert time delta {} ms)", + subdag.id, + stats.num_txs, + stats.payload_size, + stats.avg_latency, + stats.cert_time_delta, + ); + } else { + info!( + "Empty subdag #{}", + subdag.id, + ); + } } - println!("Close client"); + info!("Close client"); Ok(()) } -fn total_txs(subdag: &SubDag) -> usize { - let mut cnt = 0; +#[derive(Debug)] +pub struct Stats { + pub subdag_time: u128, + pub num_txs: usize, + pub payload_size: usize, + pub avg_latency: u128, + pub cert_time_delta: u128, +} + +fn stats(subdag: &SubDag) -> Stats { + let subdag_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis(); + let mut num_txs = 0; + let mut payload_size = 0; + let mut sum_latency = 0; + + let first_cert_ts = subdag.certificates[0].clone().header.unwrap().created_at as u128; + let last_cert_ts = subdag.leader.clone().unwrap().header.unwrap().created_at as u128; + for payload in subdag.payloads.iter() { for batch in payload.batches.iter() { - cnt += batch.transactions.len(); + num_txs += batch.transactions.len(); + for tx in batch.transactions.iter() { + payload_size += tx.len(); + // FIXME: handle parsing errors + let tx_time_bytes: [u8; 16] = tx[..16].try_into().unwrap(); + let tx_time = u128::from_be_bytes(tx_time_bytes); + sum_latency += subdag_time - tx_time; + } } } - cnt + + Stats { + subdag_time, + num_txs, + payload_size, + avg_latency: if num_txs > 0 { sum_latency / (num_txs as u128) } else { 0 }, + cert_time_delta: last_cert_ts - first_cert_ts, + } } diff --git a/simple-spammer/Cargo.toml b/simple-spammer/Cargo.toml index b603613..42d6c5b 100644 --- a/simple-spammer/Cargo.toml +++ b/simple-spammer/Cargo.toml @@ -16,6 +16,10 @@ fastcrypto.workspace = true roaring.workspace = true rand.workspace = true +env_logger = "0.10.0" +log = "0.4" +hex = "*" + [target.'cfg(not(target_env = "msvc"))'.build-dependencies] protobuf-src.workspace = true diff --git a/simple-spammer/src/main.rs b/simple-spammer/src/main.rs index 3dba9aa..0457aff 100644 --- a/simple-spammer/src/main.rs +++ b/simple-spammer/src/main.rs @@ -3,6 +3,8 @@ use rand::Rng; use std::time::Duration; use tokio::time::sleep; use tonic::transport::Channel; +use log::{error, info}; +use std::time::{SystemTime, UNIX_EPOCH}; mod narwhal { tonic::include_proto!("narwhal"); @@ -19,35 +21,47 @@ struct Args { #[arg(short, long, default_value_t=("http://127.0.0.1:64013".parse()).unwrap())] endpoint: String, /// Sleep duration, ms - #[arg(short, long, default_value_t = 100)] + #[arg(short, long, default_value_t = 1000)] sleep: u64, - /// Sleep duration, ms - #[arg(long, default_value_t = 10)] + /// Min transaction size, bytes + #[arg(long, default_value_t = 1024)] min_size: u32, - /// Sleep duration, ms - #[arg(long, default_value_t = 100)] + /// Max transaction size, bytes + #[arg(long, default_value_t = 32768)] max_size: u32, } #[tokio::main] async fn main() { + env_logger::init(); let args = Args::parse(); let mut rng = rand::thread_rng(); loop { - println!("Connecting to {}...", args.endpoint.clone()); + info!("Connecting to {}...", args.endpoint.clone()); match connect(args.endpoint.clone()).await { Ok(mut client) => { - let transaction: Vec = (1..rng.gen_range(args.min_size..args.max_size)) + let payload: Vec = (1..rng.gen_range(args.min_size..args.max_size)) .map(|_| rng.gen_range(0..255)) .collect(); - println!("Connected. Sending transaction {:?}", transaction); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + let timestamp_bytes = timestamp.to_be_bytes().to_vec(); + let transaction = [timestamp_bytes, payload].concat(); + + info!( + "Connected. Sending transaction (size {}, timestamp {} ms)", + transaction.len(), + timestamp, + ); match client.submit_transaction(Transaction { transaction }).await { - Ok(_) => println!("Done. Sleep for {}ms", args.sleep), - Err(e) => println!("Failed to send transaction: {}", e), + Ok(_) => info!("Done. Sleep for {}ms", args.sleep), + Err(e) => error!("Failed to send transaction: {}", e,), } } Err(e) => { - println!("Failed to connect: {}", e); + error!("Failed to connect: {}", e); } } sleep(Duration::from_millis(args.sleep)).await;