Skip to content

Commit

Permalink
Merge pull request #71 from NethermindEth/mu/preconf-distr
Browse files Browse the repository at this point in the history
Implement perconfirmation distribution across the P2P network
  • Loading branch information
mikhailUshakoff authored Aug 22, 2024
2 parents d8059d5 + 2147845 commit ffc10d8
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 31 deletions.
8 changes: 7 additions & 1 deletion Node/src/ethereum_l1/execution_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use alloy::{
providers::ProviderBuilder,
signers::{
local::{LocalSigner, PrivateKeySigner},
SignerSync,
Signature, SignerSync,
},
sol,
sol_types::SolValue,
Expand Down Expand Up @@ -286,6 +286,12 @@ impl ExecutionLayer {
Ok(signature.as_bytes())
}

pub fn recover_address_from_msg(&self, msg: &[u8], signature: &[u8]) -> Result<Address, Error> {
let signature = Signature::try_from(signature)?;
let address = signature.recover_address_from_msg(msg)?;
Ok(address)
}

pub async fn prove_incorrect_preconfirmation(
&self,
_block_id: u64,
Expand Down
1 change: 1 addition & 0 deletions Node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ async fn main() -> Result<(), Error> {
&config.taiko_proposer_url,
&config.taiko_driver_url,
config.block_proposed_receiver_timeout_sec,
config.taiko_chain_id,
));
let ethereum_l1 = Arc::new(
ethereum_l1::EthereumL1::new(
Expand Down
84 changes: 68 additions & 16 deletions Node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use crate::{
},
mev_boost::MevBoost,
taiko::{l2_tx_lists::RPCReplyL2TxLists, Taiko},
utils::{block::Block, block_proposed::BlockProposed, commit::L2TxListsCommit},
utils::{
block_proposed::BlockProposed, commit::L2TxListsCommit,
preconfirmation_message::PreconfirmationMessage,
preconfirmation_proof::PreconfirmationProof,
},
};
use anyhow::{anyhow as any_err, Error};
use beacon_api_client::ProposerDuty;
Expand Down Expand Up @@ -34,7 +38,7 @@ pub struct Node {
validator_pubkey: String,
current_slot_to_preconf: Option<Slot>,
next_slot_to_preconf: Option<Slot>,
preconfirmed_blocks: Arc<Mutex<HashMap<u64, Block>>>,
preconfirmed_blocks: Arc<Mutex<HashMap<u64, PreconfirmationProof>>>,
}

impl Node {
Expand Down Expand Up @@ -78,6 +82,7 @@ impl Node {
fn start_new_msg_receiver_thread(&mut self) {
let preconfirmed_blocks = self.preconfirmed_blocks.clone();
let ethereum_l1 = self.ethereum_l1.clone();
let taiko = self.taiko.clone();
if let Some(node_rx) = self.node_rx.take() {
let p2p_to_node_rx = self.p2p_to_node_rx.take().unwrap();
tokio::spawn(async move {
Expand All @@ -86,6 +91,7 @@ impl Node {
p2p_to_node_rx,
preconfirmed_blocks,
ethereum_l1,
taiko,
)
.await;
});
Expand All @@ -97,8 +103,9 @@ impl Node {
async fn handle_incoming_messages(
mut node_rx: Receiver<BlockProposed>,
mut p2p_to_node_rx: Receiver<Vec<u8>>,
preconfirmed_blocks: Arc<Mutex<HashMap<u64, Block>>>,
preconfirmed_blocks: Arc<Mutex<HashMap<u64, PreconfirmationProof>>>,
ethereum_l1: Arc<EthereumL1>,
taiko: Arc<Taiko>,
) {
loop {
tokio::select! {
Expand All @@ -112,23 +119,58 @@ impl Node {
}
},
Some(p2p_message) = p2p_to_node_rx.recv() => {
let block: Block = p2p_message.into();
tracing::debug!("Node received message from p2p: {:?}", block);
// TODO: add block to preconfirmation queue
let msg: PreconfirmationMessage = p2p_message.into();
tracing::debug!("Node received message from p2p: {:?}", msg);
Self::check_preconfirmation_message(msg, &preconfirmed_blocks, ethereum_l1.clone(), taiko.clone()).await;
}
}
}
}

async fn check_preconfirmation_message(
msg: PreconfirmationMessage,
preconfirmed_blocks: &Arc<Mutex<HashMap<u64, PreconfirmationProof>>>,
ethereum_l1: Arc<EthereumL1>,
taiko: Arc<Taiko>
) {
tracing::debug!("Node received message from p2p: {:?}", msg);
// TODO check valid preconfer
// check hash
match L2TxListsCommit::from_preconf(msg.block_height, msg.tx_list_bytes, taiko.chain_id).hash() {
Ok(hash) => {
if hash == msg.proof.commit_hash {
// check signature
match ethereum_l1.execution_layer.recover_address_from_msg(&msg.proof.commit_hash, &msg.proof.signature) {
Ok(_) => {
// Add to preconfirmation map
preconfirmed_blocks.lock().await.insert(msg.block_height, msg.proof);
// Advance head
if let Err(e) = taiko.advance_head_to_new_l2_block(msg.tx_lists, msg.gas_used).await {
tracing::error!("Failed to advance head: {} for block_id: {}", e, msg.block_height);
}
}
Err(e) => {
tracing::error!("Failed to check signature: {} for block_id: {}", e, msg.block_height);
}
}
} else {
tracing::warn!("Preconfirmatoin hash is not correct for block_id: {}", msg.block_height);
}
}
Err(e) =>{
tracing::warn!("Failed to calculate hash: {}", e);
}
}
}

async fn check_preconfirmed_blocks_correctness(
preconfirmed_blocks: &Arc<Mutex<HashMap<u64, Block>>>,
preconfirmed_blocks: &Arc<Mutex<HashMap<u64, PreconfirmationProof>>>,
block_proposed: &BlockProposed,
ethereum_l1: Arc<EthereumL1>,
) -> Result<(), Error> {
let preconfirmed_blocks = preconfirmed_blocks.lock().await;
if let Some(block) = preconfirmed_blocks.get(&block_proposed.block_id) {
//TODO: verify the signature?

//Signature is already verified on precof insertion
if block.commit_hash != block_proposed.tx_list_hash {
info!(
"Block tx_list_hash is not correct for block_id: {}. Calling proof of incorrect preconfirmation.",
Expand Down Expand Up @@ -207,11 +249,18 @@ impl Node {
let (commit_hash, signature) =
self.generate_commit_hash_and_signature(&pending_tx_lists, new_block_height)?;

let new_block = Block {
let proof = PreconfirmationProof {
commit_hash,
signature,
};
self.send_preconfirmations_to_the_avs_p2p(new_block.clone())
let preconf_message = PreconfirmationMessage {
block_height: new_block_height,
tx_lists: pending_tx_lists.tx_lists.clone(),
tx_list_bytes: pending_tx_lists.tx_list_bytes[0].clone(), //TODO: handle rest tx lists
gas_used: self.gas_used,
proof: proof.clone(),
};
self.send_preconfirmations_to_the_avs_p2p(preconf_message.clone())
.await?;
self.taiko
.advance_head_to_new_l2_block(pending_tx_lists.tx_lists, self.gas_used)
Expand All @@ -228,7 +277,7 @@ impl Node {
self.preconfirmed_blocks
.lock()
.await
.insert(new_block_height, new_block);
.insert(new_block_height, proof);

Ok(())
}
Expand All @@ -239,7 +288,7 @@ impl Node {
reply: &RPCReplyL2TxLists,
block_height: u64,
) -> Result<([u8; 32], [u8; 65]), Error> {
let commit = L2TxListsCommit::new(reply, block_height);
let commit = L2TxListsCommit::new(reply, block_height, self.taiko.chain_id);
let hash = commit.hash()?;
let signature = self
.ethereum_l1
Expand All @@ -249,7 +298,7 @@ impl Node {
}

async fn clean_old_blocks(
preconfirmed_blocks: &Arc<Mutex<HashMap<u64, Block>>>,
preconfirmed_blocks: &Arc<Mutex<HashMap<u64, PreconfirmationProof>>>,
current_block_height: u64,
) -> Result<(), Error> {
let oldest_block_to_keep = current_block_height - OLDEST_BLOCK_DISTANCE;
Expand All @@ -265,9 +314,12 @@ impl Node {
.map(|duty| duty.slot)
}

async fn send_preconfirmations_to_the_avs_p2p(&self, block: Block) -> Result<(), Error> {
async fn send_preconfirmations_to_the_avs_p2p(
&self,
message: PreconfirmationMessage,
) -> Result<(), Error> {
self.node_to_p2p_tx
.send(block.into())
.send(message.into())
.await
.map_err(|e| any_err!("Failed to send message to node_to_p2p_tx: {}", e))
}
Expand Down
5 changes: 4 additions & 1 deletion Node/src/taiko/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ pub struct Taiko {
rpc_proposer: RpcClient,
rpc_driver: RpcClient,
rpc_driver_long_timeout: RpcClient,
pub chain_id: u64,
}

impl Taiko {
pub fn new(proposer_url: &str, driver_url: &str, long_timeout_sec: u64) -> Self {
pub fn new(proposer_url: &str, driver_url: &str, long_timeout_sec: u64, chain_id: u64) -> Self {
Self {
rpc_proposer: RpcClient::new(proposer_url),
rpc_driver: RpcClient::new(driver_url),
rpc_driver_long_timeout: RpcClient::new_with_timeout(
driver_url,
Duration::from_secs(long_timeout_sec),
),
chain_id,
}
}

Expand Down Expand Up @@ -156,6 +158,7 @@ mod test {
&format!("http://127.0.0.1:{}", port),
&format!("http://127.0.0.1:{}", port),
120,
1,
);
(rpc_server, taiko)
}
Expand Down
39 changes: 31 additions & 8 deletions Node/src/utils/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,40 @@ use secp256k1::{ecdsa::Signature, Message, Secp256k1, SecretKey};
use serde::{Deserialize, Serialize};
use tiny_keccak::{Hasher, Keccak};

//https://github.com/NethermindEth/Taiko-Preconf-AVS/blob/caf9fbbde0dd84947af5a7b26610ffd38525d932/SmartContracts/src/avs/PreconfTaskManager.sol#L175
#[derive(Serialize, Deserialize)]
pub struct L2TxListsCommit {
pub block_height: [u8; 32],
pub chain_id: [u8; 32],
pub tx_list_bytes: Vec<u8>,
pub parent_meta_hash: [u8; 32],
pub block_height: u64,
}

impl L2TxListsCommit {
pub fn new(reply: &RPCReplyL2TxLists, block_height: u64) -> Self {
pub fn new(reply: &RPCReplyL2TxLists, block_height: u64, chain_id: u64) -> Self {
let block_height_bytes = block_height.to_le_bytes(); // Convert u64 to a [u8; 8] array
let mut block_height = [0u8; 32];
block_height[24..].copy_from_slice(&block_height_bytes);
let chain_id_bytes = chain_id.to_le_bytes(); // Convert u64 to a [u8; 8] array
let mut chain_id = [0u8; 32];
chain_id[24..].copy_from_slice(&chain_id_bytes);
L2TxListsCommit {
block_height,
chain_id,
tx_list_bytes: reply.tx_list_bytes[0].clone(), // TODO check for other indexes
parent_meta_hash: reply.parent_meta_hash,
}
}

pub fn from_preconf(block_height: u64, tx_list_bytes: Vec<u8>, chain_id: u64) -> Self {
let block_height_bytes = block_height.to_le_bytes(); // Convert u64 to a [u8; 8] array
let mut block_height = [0u8; 32];
block_height[24..].copy_from_slice(&block_height_bytes);
let chain_id_bytes = chain_id.to_le_bytes(); // Convert u64 to a [u8; 8] array
let mut chain_id = [0u8; 32];
chain_id[24..].copy_from_slice(&chain_id_bytes);
L2TxListsCommit {
block_height,
chain_id,
tx_list_bytes,
}
}
}
Expand Down Expand Up @@ -49,8 +70,8 @@ mod tests {
fn test_hash() {
let commit = L2TxListsCommit {
tx_list_bytes: vec![1, 2, 3, 4, 5],
parent_meta_hash: [0u8; 32],
block_height: 1,
chain_id: [0u8; 32],
block_height: [0u8; 32],
};

let hash_result = commit.hash();
Expand All @@ -61,10 +82,12 @@ mod tests {

#[test]
fn test_sign() {
let mut block_height = [0u8; 32];
block_height[31] = 1;
let commit = L2TxListsCommit {
tx_list_bytes: vec![1, 2, 3, 4, 5],
parent_meta_hash: [0u8; 32],
block_height: 1,
chain_id: [0u8; 32],
block_height,
};

let private_key = "c87509a1c067bbde78beb793e6fa950b8d9c7f7bd5a8b16bf0d3a1a5b9bdfd3b";
Expand Down
13 changes: 13 additions & 0 deletions Node/src/utils/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub struct Config {
pub preconf_registry_expiry_sec: u64,
pub contract_addresses: ContractAddresses,
pub p2p_network_config: P2PNetworkConfig,
pub taiko_chain_id: u64,
}

#[derive(Debug)]
Expand Down Expand Up @@ -195,6 +196,17 @@ impl Config {
boot_nodes,
};

let taiko_chain_id = std::env::var("TAIKO_CHAIN_ID")
.expect("TAIKO_CHAIN_ID env variable must be set")
.parse::<u64>()
.map(|val| {
if val == 0 {
panic!("TAIKO_CHAIN_ID must be a positive number");
}
val
})
.expect("TAIKO_CHAIN_ID must be a number");

let config = Self {
taiko_proposer_url: std::env::var("TAIKO_PROPOSER_URL")
.unwrap_or("http://127.0.0.1:1234".to_string()),
Expand All @@ -214,6 +226,7 @@ impl Config {
preconf_registry_expiry_sec,
contract_addresses,
p2p_network_config,
taiko_chain_id,
};

info!(
Expand Down
3 changes: 2 additions & 1 deletion Node/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod block;
pub mod block_proposed;
pub mod commit;
pub mod config;
pub mod preconfirmation_message;
pub mod preconfirmation_proof;
pub mod rpc_client;
pub mod rpc_server;
24 changes: 24 additions & 0 deletions Node/src/utils/preconfirmation_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use crate::utils::preconfirmation_proof::PreconfirmationProof;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PreconfirmationMessage {
pub block_height: u64,
pub tx_lists: Value,
pub tx_list_bytes: Vec<u8>,
pub gas_used: u64,
pub proof: PreconfirmationProof,
}

impl From<PreconfirmationMessage> for Vec<u8> {
fn from(val: PreconfirmationMessage) -> Self {
bincode::serialize(&val).expect("Serialization failed")
}
}

impl From<Vec<u8>> for PreconfirmationMessage {
fn from(bytes: Vec<u8>) -> Self {
bincode::deserialize(&bytes).expect("Deserialization failed")
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Block {
pub struct PreconfirmationProof {
pub commit_hash: [u8; 32],
#[serde(with = "serde_bytes")]
pub signature: [u8; 65], // ECDSA 65 bytes signature
}

impl From<Block> for Vec<u8> {
fn from(val: Block) -> Self {
impl From<PreconfirmationProof> for Vec<u8> {
fn from(val: PreconfirmationProof) -> Self {
bincode::serialize(&val).expect("Serialization failed")
}
}

impl From<Vec<u8>> for Block {
impl From<Vec<u8>> for PreconfirmationProof {
fn from(bytes: Vec<u8>) -> Self {
bincode::deserialize(&bytes).expect("Deserialization failed")
}
Expand Down

0 comments on commit ffc10d8

Please sign in to comment.