From a1327d3c8c8679b45bd30864779cf961339aae4b Mon Sep 17 00:00:00 2001 From: Maciej Skrzypkowski Date: Wed, 26 Jun 2024 10:36:49 +0200 Subject: [PATCH] submit new l2 blocks using RPC (#28) * submit_new_l2_blocks implemented * fixed tests * Fix loops in Node/src/node/mod.rs to be consistent (#38) every 4 seconds and not blocked by channel processing Co-authored-by: Ahmad Bitar * Two RCP addresses for the Node fixed some JSON RPC compatibility with taiko-clients * unused import * small fixes --------- Co-authored-by: Ahmad Bitar <33181301+smartprogrammer93@users.noreply.github.com> Co-authored-by: Ahmad Bitar --- Node/src/node/mod.rs | 63 ++++++++++++------ Node/src/p2p_network/mod.rs | 2 +- Node/src/taiko/mod.rs | 103 ++++++++++++++++++++++++----- Node/src/utils/rpc_server.rs | 123 ++++++++++++++++++----------------- 4 files changed, 192 insertions(+), 99 deletions(-) diff --git a/Node/src/node/mod.rs b/Node/src/node/mod.rs index f2250b0..7f439bf 100644 --- a/Node/src/node/mod.rs +++ b/Node/src/node/mod.rs @@ -1,20 +1,22 @@ use crate::taiko::Taiko; -use anyhow::{anyhow as err, Context, Error}; +use anyhow::{anyhow as any_err, Error, Ok}; use tokio::sync::mpsc::{Receiver, Sender}; pub struct Node { taiko: Taiko, - node_rx: Receiver, + node_rx: Option>, avs_p2p_tx: Sender, + gas_used: u64, } impl Node { pub fn new(node_rx: Receiver, avs_p2p_tx: Sender) -> Self { - let taiko = Taiko::new("http://127.0.0.1:1234"); + let taiko = Taiko::new("http://127.0.0.1:1234", "http://127.0.0.1:1235"); Self { taiko, - node_rx, + node_rx: Some(node_rx), avs_p2p_tx, + gas_used: 0, } } @@ -22,35 +24,54 @@ impl Node { /// one for handling incoming messages and one for the block preconfirmation pub async fn entrypoint(mut self) -> Result<(), Error> { tracing::info!("Starting node"); + self.start_new_msg_receiver_thread(); + self.preconfirmation_loop().await; + Ok(()) + } + + fn start_new_msg_receiver_thread(&mut self) { + if let Some(node_rx) = self.node_rx.take() { + tokio::spawn(async move { + Self::handle_incoming_messages(node_rx).await; + }); + } else { + tracing::error!("node_rx has already been moved"); + } + } + + async fn handle_incoming_messages(mut node_rx: Receiver) { loop { - if let Err(err) = self.step().await { - tracing::error!("Node processing step failed: {}", err); + tokio::select! { + Some(message) = node_rx.recv() => { + tracing::debug!("Node received message: {}", message); + }, } } } - async fn step(&mut self) -> Result<(), Error> { - if let Ok(msg) = self.node_rx.try_recv() { - self.process_incoming_message(msg).await?; - } else { - self.main_block_preconfirmation_step().await?; + async fn preconfirmation_loop(&self) { + loop { + let start_time = tokio::time::Instant::now(); + if let Err(err) = self.main_block_preconfirmation_step().await { + tracing::error!("Failed to execute main block preconfirmation step: {}", err); + } + let elapsed = start_time.elapsed(); + let sleep_duration = std::time::Duration::from_secs(4).saturating_sub(elapsed); + tokio::time::sleep(sleep_duration).await; } - Ok(()) } async fn main_block_preconfirmation_step(&self) -> Result<(), Error> { - self.taiko + let pending_tx_lists = self + .taiko .get_pending_l2_tx_lists() .await - .context("Failed to get pending l2 tx lists")?; + .map_err(Error::from)?; self.commit_to_the_tx_lists(); self.send_preconfirmations_to_the_avs_p2p().await?; - self.taiko.submit_new_l2_blocks(); - Ok(()) - } - - async fn process_incoming_message(&mut self, msg: String) -> Result<(), Error> { - tracing::debug!("Node received message: {}", msg); + self.taiko + .advance_head_to_new_l2_block(pending_tx_lists, self.gas_used) + .await?; Ok(()) } @@ -62,6 +83,6 @@ impl Node { self.avs_p2p_tx .send("Hello from node!".to_string()) .await - .map_err(|e| err!("Failed to send message to avs_p2p_tx: {}", e)) + .map_err(|e| any_err!("Failed to send message to avs_p2p_tx: {}", e)) } } diff --git a/Node/src/p2p_network/mod.rs b/Node/src/p2p_network/mod.rs index 4a775eb..d795032 100644 --- a/Node/src/p2p_network/mod.rs +++ b/Node/src/p2p_network/mod.rs @@ -26,7 +26,7 @@ impl AVSp2p { .send("Hello from avs p2p!".to_string()) .await .unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; } }); diff --git a/Node/src/taiko/mod.rs b/Node/src/taiko/mod.rs index 98aee38..f48c2e9 100644 --- a/Node/src/taiko/mod.rs +++ b/Node/src/taiko/mod.rs @@ -3,25 +3,38 @@ use anyhow::Error; use serde_json::Value; pub struct Taiko { - rpc_client: RpcClient, + rpc_proposer: RpcClient, + rpc_driver: RpcClient, } impl Taiko { - pub fn new(url: &str) -> Self { + pub fn new(proposer_url: &str, driver_url: &str) -> Self { Self { - rpc_client: RpcClient::new(url), + rpc_proposer: RpcClient::new(proposer_url), + rpc_driver: RpcClient::new(driver_url), } } pub async fn get_pending_l2_tx_lists(&self) -> Result { tracing::debug!("Getting L2 tx lists"); - self.rpc_client + self.rpc_proposer .call_method("RPC.GetL2TxLists", vec![]) .await } - pub fn submit_new_l2_blocks(&self) { + pub async fn advance_head_to_new_l2_block( + &self, + tx_lists: Value, + gas_used: u64, + ) -> Result { tracing::debug!("Submitting new L2 blocks"); + let payload = serde_json::json!({ + "TxLists": tx_lists["TxLists"], + "gasUsed": gas_used, + }); + self.rpc_driver + .call_method("RPC.AdvanceL2ChainHeadWithNewBlocks", vec![payload]) + .await } } @@ -35,22 +48,76 @@ mod test { async fn test_get_pending_l2_tx_lists() { tracing_subscriber::fmt::init(); + let (mut rpc_server, taiko) = setup_rpc_server_and_taiko(3030).await; + let json = taiko.get_pending_l2_tx_lists().await.unwrap(); + + assert_eq!(json["TxLists"].as_array().unwrap().len(), 1); + assert_eq!(json["TxLists"][0].as_array().unwrap().len(), 3); + assert_eq!(json["TxLists"][0][0]["type"], "0x0"); + assert_eq!( + json["TxLists"][0][0]["hash"], + "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd" + ); + assert_eq!(json["TxLists"][0][1]["type"], "0x2"); + assert_eq!( + json["TxLists"][0][1]["hash"], + "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b" + ); + assert_eq!(json["TxLists"][0][2]["type"], "0x2"); + assert_eq!( + json["TxLists"][0][2]["hash"], + "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193" + ); + rpc_server.stop().await; + } + + #[tokio::test] + async fn test_advance_head_to_new_l2_block() { + let (mut rpc_server, taiko) = setup_rpc_server_and_taiko(3040).await; + let value = serde_json::json!({ + "TxLists": [ + [ + { + "type": "0x0", + "chainId": "0x28c61", + "nonce": "0x1", + "to": "0xbfadd5365bb2890ad832038837115e60b71f7cbb", + "gas": "0x267ac", + "gasPrice": "0x5e76e0800", + "maxPriorityFeePerGas": null, + "maxFeePerGas": null, + "value": "0x0", + "input": "0x40d097c30000000000000000000000004cea2c7d358e313f5d0287c475f9ae943fe1a913", + "v": "0x518e6", + "r": "0xb22da5cdc4c091ec85d2dda9054aa497088e55bd9f0335f39864ae1c598dd35", + "s": "0x6eee1bcfe6a1855e89dd23d40942c90a036f273159b4c4fd217d58169493f055", + "hash": "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd" + } + ] + ] + }); + + let response = taiko + .advance_head_to_new_l2_block(value, 1234) + .await + .unwrap(); + assert_eq!( + response["result"], + "Request received and processed successfully" + ); + rpc_server.stop().await; + } + + async fn setup_rpc_server_and_taiko(port: u16) -> (RpcServer, Taiko) { // Start the RPC server let mut rpc_server = RpcServer::new(); - let addr: SocketAddr = "127.0.0.1:3030".parse().unwrap(); + let addr: SocketAddr = format!("127.0.0.1:{}", port).parse().unwrap(); rpc_server.start_test_responses(addr).await.unwrap(); - let taiko = Taiko::new("http://127.0.0.1:3030"); - let json = taiko.get_pending_l2_tx_lists().await.unwrap(); - - assert_eq!(json["result"]["TxLists"].as_array().unwrap().len(), 1); - assert_eq!(json["result"]["TxLists"][0].as_array().unwrap().len(), 3); - assert_eq!(json["result"]["TxLists"][0][0]["type"], "0x0"); - assert_eq!(json["result"]["TxLists"][0][0]["hash"], "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd"); - assert_eq!(json["result"]["TxLists"][0][1]["type"], "0x2"); - assert_eq!(json["result"]["TxLists"][0][1]["hash"], "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b"); - assert_eq!(json["result"]["TxLists"][0][2]["type"], "0x2"); - assert_eq!(json["result"]["TxLists"][0][2]["hash"], "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193"); - rpc_server.stop().await; + let taiko = Taiko::new( + &format!("http://127.0.0.1:{}", port), + &format!("http://127.0.0.1:{}", port), + ); + (rpc_server, taiko) } } diff --git a/Node/src/utils/rpc_server.rs b/Node/src/utils/rpc_server.rs index d9b2ff5..b1443f8 100644 --- a/Node/src/utils/rpc_server.rs +++ b/Node/src/utils/rpc_server.rs @@ -29,6 +29,15 @@ pub mod test { module.register_async_method("RPC.GetL2TxLists", |_, _, _| async { TX_LISTS_RESPONSE.clone() })?; + module.register_async_method( + "RPC.AdvanceL2ChainHeadWithNewBlocks", + |_, _, _| async { + json!({ + "result": "Request received and processed successfully", + "id": 1 + }) + }, + )?; let handle = server.start(module); tokio::spawn(handle.clone().stopped()); @@ -47,66 +56,62 @@ pub mod test { lazy_static! { pub static ref TX_LISTS_RESPONSE: serde_json::Value = json!({ - "result": { - "TxLists": [ - [ - { - "type": "0x0", - "chainId": "0x28c61", - "nonce": "0x1", - "to": "0xbfadd5365bb2890ad832038837115e60b71f7cbb", - "gas": "0x267ac", - "gasPrice": "0x5e76e0800", - "maxPriorityFeePerGas": null, - "maxFeePerGas": null, - "value": "0x0", - "input": "0x40d097c30000000000000000000000004cea2c7d358e313f5d0287c475f9ae943fe1a913", - "v": "0x518e6", - "r": "0xb22da5cdc4c091ec85d2dda9054aa497088e55bd9f0335f39864ae1c598dd35", - "s": "0x6eee1bcfe6a1855e89dd23d40942c90a036f273159b4c4fd217d58169493f055", - "hash": "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd" - }, - { - "type": "0x2", - "chainId": "0x28c61", - "nonce": "0x3f", - "to": "0x380a5ba81efe70fe98ab56613ebf9244a2f3d4c9", - "gas": "0x2c2c8", - "gasPrice": null, - "maxPriorityFeePerGas": "0x1", - "maxFeePerGas": "0x3", - "value": "0x5af3107a4000", - "input": "0x3593564c000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000006672d0a400000000000000000000000000000000000000000000000000000000000000020b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000005af3107a40000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000005af3107a400000000000000000000000000000000000000000000000000000000353ca3e629a00000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002bae2c46ddb314b9ba743c6dee4878f151881333d9000bb8ebf1f662bf092ff0d913a9fe9d7179b0efef1611000000000000000000000000000000000000000000", - "accessList": [], - "v": "0x1", - "r": "0x36517a175a60d3026380318917976fa32c82e542850357a611af05d2212ab9a4", - "s": "0x32d89dce30d76287ddba907b0c662cd09dc30891b1c9c2ef644edfc53160b298", - "yParity": "0x1", - "hash": "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b" - }, - { - "type": "0x2", - "chainId": "0x28c61", - "nonce": "0x39", - "to": "0x380a5ba81efe70fe98ab56613ebf9244a2f3d4c9", - "gas": "0x2c2c8", - "gasPrice": null, - "maxPriorityFeePerGas": "0x1", - "maxFeePerGas": "0x3", - "value": "0x5af3107a4000", - "input": "0x3593564c000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000006672d0d400000000000000000000000000000000000000000000000000000000000000020b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000005af3107a40000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000005af3107a400000000000000000000000000000000000000000000000000000000353ca3e629a00000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002bae2c46ddb314b9ba743c6dee4878f151881333d9000bb8ebf1f662bf092ff0d913a9fe9d7179b0efef1611000000000000000000000000000000000000000000", - "accessList": [], - "v": "0x0", - "r": "0xc779421d1ee81dbd3dfbfad5fd632b45303b4513ea1b8ac0bc647f5430cd97b9", - "s": "0x13cedef844bf5a954183182992ffbf9b8b23331de255157528be7da6614618b2", - "yParity": "0x0", - "hash": "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193" - } - ] + "TxLists": [ + [ + { + "type": "0x0", + "chainId": "0x28c61", + "nonce": "0x1", + "to": "0xbfadd5365bb2890ad832038837115e60b71f7cbb", + "gas": "0x267ac", + "gasPrice": "0x5e76e0800", + "maxPriorityFeePerGas": null, + "maxFeePerGas": null, + "value": "0x0", + "input": "0x40d097c30000000000000000000000004cea2c7d358e313f5d0287c475f9ae943fe1a913", + "v": "0x518e6", + "r": "0xb22da5cdc4c091ec85d2dda9054aa497088e55bd9f0335f39864ae1c598dd35", + "s": "0x6eee1bcfe6a1855e89dd23d40942c90a036f273159b4c4fd217d58169493f055", + "hash": "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd" + }, + { + "type": "0x2", + "chainId": "0x28c61", + "nonce": "0x3f", + "to": "0x380a5ba81efe70fe98ab56613ebf9244a2f3d4c9", + "gas": "0x2c2c8", + "gasPrice": null, + "maxPriorityFeePerGas": "0x1", + "maxFeePerGas": "0x3", + "value": "0x5af3107a4000", + "input": "0x3593564c000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000006672d0a400000000000000000000000000000000000000000000000000000000000000020b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000005af3107a40000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000005af3107a400000000000000000000000000000000000000000000000000000000353ca3e629a00000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002bae2c46ddb314b9ba743c6dee4878f151881333d9000bb8ebf1f662bf092ff0d913a9fe9d7179b0efef1611000000000000000000000000000000000000000000", + "accessList": [], + "v": "0x1", + "r": "0x36517a175a60d3026380318917976fa32c82e542850357a611af05d2212ab9a4", + "s": "0x32d89dce30d76287ddba907b0c662cd09dc30891b1c9c2ef644edfc53160b298", + "yParity": "0x1", + "hash": "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b" + }, + { + "type": "0x2", + "chainId": "0x28c61", + "nonce": "0x39", + "to": "0x380a5ba81efe70fe98ab56613ebf9244a2f3d4c9", + "gas": "0x2c2c8", + "gasPrice": null, + "maxPriorityFeePerGas": "0x1", + "maxFeePerGas": "0x3", + "value": "0x5af3107a4000", + "input": "0x3593564c000000000000000000000000000000000000000000000000000000000000006000000000000000000000000000000000000000000000000000000000000000a0000000000000000000000000000000000000000000000000000000006672d0d400000000000000000000000000000000000000000000000000000000000000020b000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000004000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000200000000000000000000000000000000000000000000000000005af3107a40000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000000000000000000100000000000000000000000000000000000000000000000000005af3107a400000000000000000000000000000000000000000000000000000000353ca3e629a00000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000002bae2c46ddb314b9ba743c6dee4878f151881333d9000bb8ebf1f662bf092ff0d913a9fe9d7179b0efef1611000000000000000000000000000000000000000000", + "accessList": [], + "v": "0x0", + "r": "0xc779421d1ee81dbd3dfbfad5fd632b45303b4513ea1b8ac0bc647f5430cd97b9", + "s": "0x13cedef844bf5a954183182992ffbf9b8b23331de255157528be7da6614618b2", + "yParity": "0x0", + "hash": "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193" + } ] - }, - "error": null, - "id": 1 + ] }); } }