Skip to content

Commit

Permalink
submit new l2 blocks using RPC (#28)
Browse files Browse the repository at this point in the history
* submit_new_l2_blocks implemented

* fixed tests

* Fix loops in Node/src/node/mod.rs to be consistent (#38)

every 4 seconds and not blocked by channel
processing

Co-authored-by: Ahmad Bitar <[email protected]>

* Two RCP addresses for the Node

fixed some JSON RPC compatibility with taiko-clients

* unused import

* small fixes

---------

Co-authored-by: Ahmad Bitar <[email protected]>
Co-authored-by: Ahmad Bitar <[email protected]>
  • Loading branch information
3 people committed Jun 26, 2024
1 parent 463204c commit a1327d3
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 99 deletions.
63 changes: 42 additions & 21 deletions Node/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -1,56 +1,77 @@
use crate::taiko::Taiko;
use anyhow::{anyhow as err, Context, Error};
use anyhow::{anyhow as any_err, Error, Ok};
use tokio::sync::mpsc::{Receiver, Sender};

pub struct Node {
taiko: Taiko,
node_rx: Receiver<String>,
node_rx: Option<Receiver<String>>,
avs_p2p_tx: Sender<String>,
gas_used: u64,
}

impl Node {
pub fn new(node_rx: Receiver<String>, avs_p2p_tx: Sender<String>) -> Self {
let taiko = Taiko::new("http://127.0.0.1:1234");
let taiko = Taiko::new("http://127.0.0.1:1234", "http://127.0.0.1:1235");
Self {
taiko,
node_rx,
node_rx: Some(node_rx),
avs_p2p_tx,
gas_used: 0,
}
}

/// Consumes the Node and starts two loops:
/// one for handling incoming messages and one for the block preconfirmation
pub async fn entrypoint(mut self) -> Result<(), Error> {
tracing::info!("Starting node");
self.start_new_msg_receiver_thread();
self.preconfirmation_loop().await;
Ok(())
}

fn start_new_msg_receiver_thread(&mut self) {
if let Some(node_rx) = self.node_rx.take() {
tokio::spawn(async move {
Self::handle_incoming_messages(node_rx).await;
});
} else {
tracing::error!("node_rx has already been moved");
}
}

async fn handle_incoming_messages(mut node_rx: Receiver<String>) {
loop {
if let Err(err) = self.step().await {
tracing::error!("Node processing step failed: {}", err);
tokio::select! {
Some(message) = node_rx.recv() => {
tracing::debug!("Node received message: {}", message);
},
}
}
}

async fn step(&mut self) -> Result<(), Error> {
if let Ok(msg) = self.node_rx.try_recv() {
self.process_incoming_message(msg).await?;
} else {
self.main_block_preconfirmation_step().await?;
async fn preconfirmation_loop(&self) {
loop {
let start_time = tokio::time::Instant::now();
if let Err(err) = self.main_block_preconfirmation_step().await {
tracing::error!("Failed to execute main block preconfirmation step: {}", err);
}
let elapsed = start_time.elapsed();
let sleep_duration = std::time::Duration::from_secs(4).saturating_sub(elapsed);
tokio::time::sleep(sleep_duration).await;
}
Ok(())
}

async fn main_block_preconfirmation_step(&self) -> Result<(), Error> {
self.taiko
let pending_tx_lists = self
.taiko
.get_pending_l2_tx_lists()
.await
.context("Failed to get pending l2 tx lists")?;
.map_err(Error::from)?;
self.commit_to_the_tx_lists();
self.send_preconfirmations_to_the_avs_p2p().await?;
self.taiko.submit_new_l2_blocks();
Ok(())
}

async fn process_incoming_message(&mut self, msg: String) -> Result<(), Error> {
tracing::debug!("Node received message: {}", msg);
self.taiko
.advance_head_to_new_l2_block(pending_tx_lists, self.gas_used)
.await?;
Ok(())
}

Expand All @@ -62,6 +83,6 @@ impl Node {
self.avs_p2p_tx
.send("Hello from node!".to_string())
.await
.map_err(|e| err!("Failed to send message to avs_p2p_tx: {}", e))
.map_err(|e| any_err!("Failed to send message to avs_p2p_tx: {}", e))
}
}
2 changes: 1 addition & 1 deletion Node/src/p2p_network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ impl AVSp2p {
.send("Hello from avs p2p!".to_string())
.await
.unwrap();
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
}
});

Expand Down
103 changes: 85 additions & 18 deletions Node/src/taiko/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,38 @@ use anyhow::Error;
use serde_json::Value;

pub struct Taiko {
rpc_client: RpcClient,
rpc_proposer: RpcClient,
rpc_driver: RpcClient,
}

impl Taiko {
pub fn new(url: &str) -> Self {
pub fn new(proposer_url: &str, driver_url: &str) -> Self {
Self {
rpc_client: RpcClient::new(url),
rpc_proposer: RpcClient::new(proposer_url),
rpc_driver: RpcClient::new(driver_url),
}
}

pub async fn get_pending_l2_tx_lists(&self) -> Result<Value, Error> {
tracing::debug!("Getting L2 tx lists");
self.rpc_client
self.rpc_proposer
.call_method("RPC.GetL2TxLists", vec![])
.await
}

pub fn submit_new_l2_blocks(&self) {
pub async fn advance_head_to_new_l2_block(
&self,
tx_lists: Value,
gas_used: u64,
) -> Result<Value, Error> {
tracing::debug!("Submitting new L2 blocks");
let payload = serde_json::json!({
"TxLists": tx_lists["TxLists"],
"gasUsed": gas_used,
});
self.rpc_driver
.call_method("RPC.AdvanceL2ChainHeadWithNewBlocks", vec![payload])
.await
}
}

Expand All @@ -35,22 +48,76 @@ mod test {
async fn test_get_pending_l2_tx_lists() {
tracing_subscriber::fmt::init();

let (mut rpc_server, taiko) = setup_rpc_server_and_taiko(3030).await;
let json = taiko.get_pending_l2_tx_lists().await.unwrap();

assert_eq!(json["TxLists"].as_array().unwrap().len(), 1);
assert_eq!(json["TxLists"][0].as_array().unwrap().len(), 3);
assert_eq!(json["TxLists"][0][0]["type"], "0x0");
assert_eq!(
json["TxLists"][0][0]["hash"],
"0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd"
);
assert_eq!(json["TxLists"][0][1]["type"], "0x2");
assert_eq!(
json["TxLists"][0][1]["hash"],
"0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b"
);
assert_eq!(json["TxLists"][0][2]["type"], "0x2");
assert_eq!(
json["TxLists"][0][2]["hash"],
"0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193"
);
rpc_server.stop().await;
}

#[tokio::test]
async fn test_advance_head_to_new_l2_block() {
let (mut rpc_server, taiko) = setup_rpc_server_and_taiko(3040).await;
let value = serde_json::json!({
"TxLists": [
[
{
"type": "0x0",
"chainId": "0x28c61",
"nonce": "0x1",
"to": "0xbfadd5365bb2890ad832038837115e60b71f7cbb",
"gas": "0x267ac",
"gasPrice": "0x5e76e0800",
"maxPriorityFeePerGas": null,
"maxFeePerGas": null,
"value": "0x0",
"input": "0x40d097c30000000000000000000000004cea2c7d358e313f5d0287c475f9ae943fe1a913",
"v": "0x518e6",
"r": "0xb22da5cdc4c091ec85d2dda9054aa497088e55bd9f0335f39864ae1c598dd35",
"s": "0x6eee1bcfe6a1855e89dd23d40942c90a036f273159b4c4fd217d58169493f055",
"hash": "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd"
}
]
]
});

let response = taiko
.advance_head_to_new_l2_block(value, 1234)
.await
.unwrap();
assert_eq!(
response["result"],
"Request received and processed successfully"
);
rpc_server.stop().await;
}

async fn setup_rpc_server_and_taiko(port: u16) -> (RpcServer, Taiko) {
// Start the RPC server
let mut rpc_server = RpcServer::new();
let addr: SocketAddr = "127.0.0.1:3030".parse().unwrap();
let addr: SocketAddr = format!("127.0.0.1:{}", port).parse().unwrap();
rpc_server.start_test_responses(addr).await.unwrap();

let taiko = Taiko::new("http://127.0.0.1:3030");
let json = taiko.get_pending_l2_tx_lists().await.unwrap();

assert_eq!(json["result"]["TxLists"].as_array().unwrap().len(), 1);
assert_eq!(json["result"]["TxLists"][0].as_array().unwrap().len(), 3);
assert_eq!(json["result"]["TxLists"][0][0]["type"], "0x0");
assert_eq!(json["result"]["TxLists"][0][0]["hash"], "0x7c76b9906579e54df54fe77ad1706c47aca706b3eb5cfd8a30ccc3c5a19e8ecd");
assert_eq!(json["result"]["TxLists"][0][1]["type"], "0x2");
assert_eq!(json["result"]["TxLists"][0][1]["hash"], "0xece2a3c6ca097cfe5d97aad4e79393240f63865210f9c763703d1136f065298b");
assert_eq!(json["result"]["TxLists"][0][2]["type"], "0x2");
assert_eq!(json["result"]["TxLists"][0][2]["hash"], "0xb105d9f16e8fb913093c8a2c595bf4257328d256f218a05be8dcc626ddeb4193");
rpc_server.stop().await;
let taiko = Taiko::new(
&format!("http://127.0.0.1:{}", port),
&format!("http://127.0.0.1:{}", port),
);
(rpc_server, taiko)
}
}
Loading

0 comments on commit a1327d3

Please sign in to comment.