From 913dfa423d01607d249fda1f16cec90be0e3150a Mon Sep 17 00:00:00 2001 From: sistemd Date: Wed, 25 Sep 2024 14:44:01 +0200 Subject: [PATCH 1/2] implement starknet_subscribePendingTransactions --- crates/rpc/src/jsonrpc/router/subscription.rs | 112 ++-- crates/rpc/src/method.rs | 1 + crates/rpc/src/method/subscribe_new_heads.rs | 112 +++- .../method/subscribe_pending_transactions.rs | 521 ++++++++++++++++++ crates/rpc/src/pending.rs | 2 +- crates/rpc/src/v08.rs | 6 +- 6 files changed, 698 insertions(+), 56 deletions(-) create mode 100644 crates/rpc/src/method/subscribe_pending_transactions.rs diff --git a/crates/rpc/src/jsonrpc/router/subscription.rs b/crates/rpc/src/jsonrpc/router/subscription.rs index 6feee7e64c..c7dc988c6f 100644 --- a/crates/rpc/src/jsonrpc/router/subscription.rs +++ b/crates/rpc/src/jsonrpc/router/subscription.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use axum::extract::ws::{Message, WebSocket}; use dashmap::DashMap; @@ -11,7 +12,7 @@ use crate::context::RpcContext; use crate::dto::serialize::SerializeForVersion; use crate::dto::DeserializeForVersion; use crate::error::ApplicationError; -use crate::jsonrpc::{RpcError, RpcRequest, RpcResponse}; +use crate::jsonrpc::{RequestId, RpcError, RpcRequest, RpcResponse}; use crate::{RpcVersion, SubscriptionId}; /// See [`RpcSubscriptionFlow`]. @@ -20,11 +21,11 @@ pub(super) trait RpcSubscriptionEndpoint: Send + Sync { // Start the subscription. async fn invoke( &self, - state: RpcContext, + router: RpcRouter, input: serde_json::Value, subscription_id: SubscriptionId, subscriptions: Arc>>, - version: RpcVersion, + req_id: RequestId, tx: mpsc::Sender>, ) -> Result<(), RpcError>; } @@ -57,12 +58,12 @@ pub(super) trait RpcSubscriptionEndpoint: Send + Sync { /// - Stream the first active update, and then keep streaming the rest. #[axum::async_trait] pub trait RpcSubscriptionFlow: Send + Sync { - type Request: crate::dto::DeserializeForVersion + Send + Sync + 'static; + type Request: crate::dto::DeserializeForVersion + Clone + Send + Sync + 'static; type Notification: crate::dto::serialize::SerializeForVersion + Send + Sync + 'static; /// The block to start streaming from. If the subscription endpoint does not - /// support catching up, the value returned by this method is - /// irrelevant. + /// support catching up, this method should always return + /// [`BlockId::Latest`]. fn starting_block(req: &Self::Request) -> BlockId; /// Fetch historical data from the `from` block to the `to` block. The @@ -78,6 +79,7 @@ pub trait RpcSubscriptionFlow: Send + Sync { /// Subscribe to active updates. async fn subscribe( state: RpcContext, + req: Self::Request, tx: mpsc::Sender>, ); } @@ -101,20 +103,20 @@ where { async fn invoke( &self, - state: RpcContext, + router: RpcRouter, input: serde_json::Value, subscription_id: SubscriptionId, subscriptions: Arc>>, - version: RpcVersion, - tx: mpsc::Sender>, + req_id: RequestId, + ws_tx: mpsc::Sender>, ) -> Result<(), RpcError> { - let req = T::Request::deserialize(crate::dto::Value::new(input, version)) + let req = T::Request::deserialize(crate::dto::Value::new(input, router.version)) .map_err(|e| RpcError::InvalidParams(e.to_string()))?; let tx = SubscriptionSender { subscription_id, subscriptions, - tx, - version, + tx: ws_tx.clone(), + version: router.version, _phantom: Default::default(), }; @@ -128,13 +130,32 @@ where } BlockId::Latest => { // No need to catch up. The code below will subscribe to new blocks. + // Only needs to send the subscription ID to the client. + if ws_tx + .send(Ok(Message::Text( + serde_json::to_string(&RpcResponse { + output: Ok(serde_json::to_value(&SubscriptionIdResult { + subscription_id, + }) + .unwrap()), + id: req_id.clone(), + }) + .unwrap(), + ))) + .await + .is_err() + { + return Ok(()); + } BlockNumber::MAX } BlockId::Number(_) | BlockId::Hash(_) => { // Catch up to the latest block in batches of BATCH_SIZE. + + // Load the first block number, return an error if it's invalid. let first_block = pathfinder_storage::BlockId::try_from(T::starting_block(&req)) .map_err(|e| RpcError::InvalidParams(e.to_string()))?; - let storage = state.storage.clone(); + let storage = router.context.storage.clone(); let mut current_block = tokio::task::spawn_blocking(move || -> Result<_, RpcError> { let mut conn = storage.connection().map_err(RpcError::InternalError)?; @@ -145,11 +166,34 @@ where }) .await .map_err(|e| RpcError::InternalError(e.into()))??; + + // Send the subscription ID to the client. + if ws_tx + .send(Ok(Message::Text( + serde_json::to_string(&RpcResponse { + output: Ok(serde_json::to_value(&SubscriptionIdResult { + subscription_id, + }) + .unwrap()), + id: req_id.clone(), + }) + .unwrap(), + ))) + .await + .is_err() + { + return Ok(()); + } + const BATCH_SIZE: u64 = 64; loop { - let messages = - T::catch_up(&state, &req, current_block, current_block + BATCH_SIZE) - .await?; + let messages = T::catch_up( + &router.context, + &req, + current_block, + current_block + BATCH_SIZE, + ) + .await?; if messages.is_empty() { // Caught up. break; @@ -174,7 +218,10 @@ where // Subscribe to new blocks. Receive the first subscription message. let (tx1, mut rx1) = mpsc::channel::>(1024); - tokio::spawn(T::subscribe(state.clone(), tx1)); + { + let req = req.clone(); + tokio::spawn(T::subscribe(router.context.clone(), req, tx1)); + } let first_msg = match rx1.recv().await { Some(msg) => msg, None => { @@ -188,7 +235,7 @@ where // blocks. Because the catch_up range is inclusive, we need to subtract 1 from // the block number. if let Some(block_number) = first_msg.block_number.parent() { - let messages = T::catch_up(&state, &req, current_block, block_number).await?; + let messages = T::catch_up(&router.context, &req, current_block, block_number).await?; for msg in messages { if tx .send(msg.notification, msg.subscription_name) @@ -415,36 +462,19 @@ pub fn handle_json_rpc_socket( }; // Start the subscription. + let state = state.clone(); let subscription_id = SubscriptionId::next(); - let context = state.context.clone(); - let version = state.version; let ws_tx = ws_tx.clone(); - if ws_tx - .send(Ok(Message::Text( - serde_json::to_string(&RpcResponse { - output: Ok( - serde_json::to_value(&SubscriptionIdResult { subscription_id }) - .unwrap(), - ), - id: req_id.clone(), - }) - .unwrap(), - ))) - .await - .is_err() - { - break; - } let handle = tokio::spawn({ let subscriptions = subscriptions.clone(); async move { if let Err(e) = endpoint .invoke( - context, + state, params, subscription_id, - subscriptions, - version, + subscriptions.clone(), + req_id.clone(), ws_tx.clone(), ) .await @@ -456,6 +486,10 @@ pub fn handle_json_rpc_socket( })) .await .ok(); + while subscriptions.remove(&subscription_id).is_none() { + // Race condition, the insert has not yet happened. + tokio::time::sleep(Duration::from_secs(1)).await; + } } } }); diff --git a/crates/rpc/src/method.rs b/crates/rpc/src/method.rs index 225fc5764a..388602b728 100644 --- a/crates/rpc/src/method.rs +++ b/crates/rpc/src/method.rs @@ -24,6 +24,7 @@ pub mod get_transaction_receipt; pub mod get_transaction_status; pub mod simulate_transactions; pub mod subscribe_new_heads; +pub mod subscribe_pending_transactions; pub mod syncing; pub mod trace_block_transactions; pub mod trace_transaction; diff --git a/crates/rpc/src/method/subscribe_new_heads.rs b/crates/rpc/src/method/subscribe_new_heads.rs index b1c36cee6d..367faf9a90 100644 --- a/crates/rpc/src/method/subscribe_new_heads.rs +++ b/crates/rpc/src/method/subscribe_new_heads.rs @@ -11,28 +11,28 @@ use crate::Reorg; pub struct SubscribeNewHeads; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Request { - block: BlockId, + block: Option, } impl crate::dto::DeserializeForVersion for Request { fn deserialize(value: crate::dto::Value) -> Result { value.deserialize_map(|value| { Ok(Self { - block: value.deserialize_serde("block")?, + block: value.deserialize_optional_serde("block")?, }) }) } } #[derive(Debug)] -pub enum Message { +pub enum Notification { BlockHeader(Arc), Reorg(Arc), } -impl crate::dto::serialize::SerializeForVersion for Message { +impl crate::dto::serialize::SerializeForVersion for Notification { fn serialize( &self, serializer: crate::dto::serialize::Serializer, @@ -49,10 +49,10 @@ const SUBSCRIPTION_NAME: &str = "starknet_subscriptionNewHeads"; #[async_trait] impl RpcSubscriptionFlow for SubscribeNewHeads { type Request = Request; - type Notification = Message; + type Notification = Notification; fn starting_block(req: &Self::Request) -> BlockId { - req.block + req.block.unwrap_or(BlockId::Latest) } async fn catch_up( @@ -74,7 +74,7 @@ impl RpcSubscriptionFlow for SubscribeNewHeads { .map(|header| { let block_number = header.number; SubscriptionMessage { - notification: Message::BlockHeader(header.into()), + notification: Notification::BlockHeader(header.into()), block_number, subscription_name: SUBSCRIPTION_NAME, } @@ -84,6 +84,7 @@ impl RpcSubscriptionFlow for SubscribeNewHeads { async fn subscribe( state: RpcContext, + _req: Self::Request, tx: mpsc::Sender>, ) { let mut headers = state.notifications.block_headers.subscribe(); @@ -95,7 +96,7 @@ impl RpcSubscriptionFlow for SubscribeNewHeads { Ok(reorg) => { let block_number = reorg.first_block_number; if tx.send(SubscriptionMessage { - notification: Message::Reorg(reorg), + notification: Notification::Reorg(reorg), block_number, subscription_name: REORG_SUBSCRIPTION_NAME, }).await.is_err() { @@ -117,7 +118,7 @@ impl RpcSubscriptionFlow for SubscribeNewHeads { Ok(header) => { let block_number = header.number; if tx.send(SubscriptionMessage { - notification: Message::BlockHeader(header), + notification: Notification::BlockHeader(header), block_number, subscription_name: SUBSCRIPTION_NAME, }).await.is_err() { @@ -151,14 +152,14 @@ mod tests { use tokio::sync::mpsc; use crate::context::{RpcConfig, RpcContext}; - use crate::jsonrpc::{handle_json_rpc_socket, RpcResponse, RpcRouter}; + use crate::jsonrpc::{handle_json_rpc_socket, RequestId, RpcError, RpcResponse, RpcRouter}; use crate::pending::PendingWatcher; use crate::v02::types::syncing::Syncing; use crate::{v08, Notifications, Reorg, SubscriptionId, SyncState}; #[tokio::test] async fn happy_path_with_historic_blocks() { - happy_path_test(1000).await; + happy_path_test(2000).await; } #[tokio::test] @@ -306,6 +307,88 @@ mod tests { assert!(sender_rx.is_empty()); } + #[tokio::test] + async fn invalid_subscription() { + let router = setup(0).await; + let (sender_tx, mut sender_rx) = mpsc::channel(1024); + let (receiver_tx, receiver_rx) = mpsc::channel(1024); + handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); + receiver_tx + .send(Ok(Message::Text( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "starknet_subscribeNewHeads", + }) + .to_string(), + ))) + .await + .unwrap(); + let res = sender_rx.recv().await.unwrap(); + let res = match res { + Ok(_) => panic!("Expected Err, got Ok"), + Err(e) => e, + }; + assert_eq!( + res, + RpcResponse { + output: Err(RpcError::InvalidParams( + "expected object or array".to_string() + )), + id: RequestId::Number(1), + } + ); + } + + #[tokio::test] + async fn subscribe_no_params() { + let router = setup(0).await; + let (sender_tx, mut sender_rx) = mpsc::channel(1024); + let (receiver_tx, receiver_rx) = mpsc::channel(1024); + handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); + receiver_tx + .send(Ok(Message::Text( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "starknet_subscribeNewHeads", + "params": {} + }) + .to_string(), + ))) + .await + .unwrap(); + let res = sender_rx.recv().await.unwrap().unwrap(); + let subscription_id = match res { + Message::Text(json) => { + let json: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(json["jsonrpc"], "2.0"); + assert_eq!(json["id"], 1); + json["result"]["subscription_id"].as_u64().unwrap() + } + _ => panic!("Expected text message"), + }; + for i in 0..10 { + retry(|| { + router + .context + .notifications + .block_headers + .send(sample_header(i).into()) + }) + .await + .unwrap(); + let expected = sample_new_heads_message(i, subscription_id); + let header = sender_rx.recv().await.unwrap().unwrap(); + let json: serde_json::Value = match header { + Message::Text(json) => serde_json::from_str(&json).unwrap(), + _ => panic!("Expected text message"), + }; + assert_eq!(json, expected); + } + assert!(sender_rx.is_empty()); + } + #[tokio::test] async fn unsubscribe() { let (tx, mut rx, subscription_id, router) = happy_path_test(0).await; @@ -501,11 +584,12 @@ mod tests { where E: std::fmt::Debug, { - for i in 0..25 { + const RETRIES: u64 = 25; + for i in 0..RETRIES { match cb() { Ok(result) => return Ok(result), Err(e) => { - if i == 24 { + if i == RETRIES - 1 { return Err(e); } tokio::time::sleep(Duration::from_secs(i)).await; diff --git a/crates/rpc/src/method/subscribe_pending_transactions.rs b/crates/rpc/src/method/subscribe_pending_transactions.rs new file mode 100644 index 0000000000..b8b35608d7 --- /dev/null +++ b/crates/rpc/src/method/subscribe_pending_transactions.rs @@ -0,0 +1,521 @@ +use std::collections::HashSet; + +use axum::async_trait; +use pathfinder_common::transaction::Transaction; +use pathfinder_common::{BlockId, BlockNumber, ContractAddress, TransactionHash}; +use tokio::sync::mpsc; + +use crate::context::RpcContext; +use crate::jsonrpc::{RpcError, RpcSubscriptionFlow, SubscriptionMessage}; + +pub struct SubscribePendingTransactions; + +#[derive(Debug, Clone)] +pub struct Request { + transaction_details: Option, + sender_address: Option>, +} + +impl crate::dto::DeserializeForVersion for Request { + fn deserialize(value: crate::dto::Value) -> Result { + value.deserialize_map(|value| { + Ok(Self { + transaction_details: value.deserialize_optional_serde("transaction_details")?, + sender_address: value + .deserialize_optional_array("sender_address", |addr| { + Ok(ContractAddress(addr.deserialize()?)) + })? + .map(|addrs| addrs.into_iter().collect()), + }) + }) + } +} + +#[derive(Debug)] +pub enum Notification { + Transaction(Box), + TransactionHash(TransactionHash), +} + +impl crate::dto::serialize::SerializeForVersion for Notification { + fn serialize( + &self, + serializer: crate::dto::serialize::Serializer, + ) -> Result { + match self { + Notification::Transaction(transaction) => { + crate::dto::Transaction(transaction).serialize(serializer) + } + Notification::TransactionHash(transaction_hash) => { + transaction_hash.0.serialize(serializer) + } + } + } +} + +const SUBSCRIPTION_NAME: &str = "starknet_subscriptionPendingTransactions"; + +#[async_trait] +impl RpcSubscriptionFlow for SubscribePendingTransactions { + type Request = Request; + type Notification = Notification; + + fn starting_block(_req: &Self::Request) -> BlockId { + // Rollback is not supported. + BlockId::Latest + } + + async fn catch_up( + _state: &RpcContext, + _req: &Self::Request, + _from: BlockNumber, + _to: BlockNumber, + ) -> Result>, RpcError> { + Ok(vec![]) + } + + async fn subscribe( + state: RpcContext, + req: Self::Request, + tx: mpsc::Sender>, + ) { + let mut pending_data = state.pending_data.0.clone(); + // Last block sent to the subscriber. Initial value doesn't really matter + let mut last_block = BlockNumber::GENESIS; + // Hashes of transactions that have already been sent to the subscriber, as part + // of `last_block` block. It is necessary to keep track of this because the + // pending data updates might include new transactions for the same + // block number. + let mut sent_txs = HashSet::new(); + loop { + let pending = pending_data.borrow_and_update().clone(); + if pending.number != last_block { + last_block = pending.number; + sent_txs.clear(); + } + for transaction in pending.block.transactions.iter() { + if sent_txs.contains(&transaction.hash) { + continue; + } + // Filter the transactions by sender address. + if let Some(sender_address) = &req.sender_address { + use pathfinder_common::transaction::TransactionVariant::*; + let address = match &transaction.variant { + DeclareV0(tx) => tx.sender_address, + DeclareV1(tx) => tx.sender_address, + DeclareV2(tx) => tx.sender_address, + DeclareV3(tx) => tx.sender_address, + DeployV0(tx) => tx.contract_address, + DeployV1(tx) => tx.contract_address, + DeployAccountV1(tx) => tx.contract_address, + DeployAccountV3(tx) => tx.contract_address, + InvokeV0(tx) => tx.sender_address, + InvokeV1(tx) => tx.sender_address, + InvokeV3(tx) => tx.sender_address, + L1Handler(tx) => tx.contract_address, + }; + if !sender_address.contains(&address) { + continue; + } + } + let notification = match req.transaction_details { + Some(true) => Notification::Transaction(transaction.clone().into()), + Some(false) | None => Notification::TransactionHash(transaction.hash), + }; + sent_txs.insert(transaction.hash); + if tx + .send(SubscriptionMessage { + notification, + block_number: pending.number, + subscription_name: SUBSCRIPTION_NAME, + }) + .await + .is_err() + { + // Subscription has been closed. + return; + } + } + if pending_data.changed().await.is_err() { + tracing::debug!("Pending data channel closed, stopping subscription"); + break; + } + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use axum::extract::ws::Message; + use pathfinder_common::transaction::{DeclareTransactionV0V1, Transaction, TransactionVariant}; + use pathfinder_common::{ + contract_address, + transaction_hash, + BlockNumber, + ChainId, + ContractAddress, + TransactionHash, + }; + use pathfinder_storage::StorageBuilder; + use starknet_gateway_client::Client; + use starknet_gateway_types::reply::PendingBlock; + use tokio::sync::{mpsc, watch}; + + use crate::context::{RpcConfig, RpcContext}; + use crate::jsonrpc::{handle_json_rpc_socket, RpcResponse}; + use crate::pending::PendingWatcher; + use crate::v02::types::syncing::Syncing; + use crate::{v08, Notifications, PendingData, SyncState}; + + #[tokio::test] + async fn no_filtering_no_details() { + let Setup { + tx, + mut rx, + pending_data_tx, + } = setup(); + tx.send(Ok(Message::Text( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "starknet_subscribePendingTransactions", + "params": {} + }) + .to_string(), + ))) + .await + .unwrap(); + let response = rx.recv().await.unwrap().unwrap(); + let subscription_id = match response { + Message::Text(json) => { + let json: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(json["jsonrpc"], "2.0"); + assert_eq!(json["id"], 1); + json["result"]["subscription_id"].as_u64().unwrap() + } + _ => { + panic!("Expected text message"); + } + }; + pending_data_tx + .send(sample_block( + BlockNumber::GENESIS, + vec![ + (contract_address!("0x1"), transaction_hash!("0x1")), + (contract_address!("0x2"), transaction_hash!("0x2")), + ], + )) + .unwrap(); + assert_eq!( + recv(&mut rx).await, + sample_message_no_details("0x1", subscription_id) + ); + assert_eq!( + recv(&mut rx).await, + sample_message_no_details("0x2", subscription_id) + ); + assert!(rx.is_empty()); + pending_data_tx + .send(sample_block( + BlockNumber::GENESIS, + vec![ + (contract_address!("0x1"), transaction_hash!("0x1")), + (contract_address!("0x2"), transaction_hash!("0x2")), + (contract_address!("0x3"), transaction_hash!("0x3")), + (contract_address!("0x4"), transaction_hash!("0x4")), + (contract_address!("0x5"), transaction_hash!("0x5")), + ], + )) + .unwrap(); + // Assert that same transactions are not sent twice. + assert_eq!( + recv(&mut rx).await, + sample_message_no_details("0x3", subscription_id) + ); + assert_eq!( + recv(&mut rx).await, + sample_message_no_details("0x4", subscription_id) + ); + assert_eq!( + recv(&mut rx).await, + sample_message_no_details("0x5", subscription_id) + ); + // Assert that transactions from new blocks are sent correctly. + pending_data_tx + .send(sample_block( + BlockNumber::GENESIS + 1, + vec![(contract_address!("0x1"), transaction_hash!("0x1"))], + )) + .unwrap(); + assert_eq!( + recv(&mut rx).await, + sample_message_no_details("0x1", subscription_id) + ); + assert!(rx.is_empty()); + } + + #[tokio::test] + async fn no_filtering_with_details() { + let Setup { + tx, + mut rx, + pending_data_tx, + } = setup(); + tx.send(Ok(Message::Text( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "starknet_subscribePendingTransactions", + "params": { + "transaction_details": true + } + }) + .to_string(), + ))) + .await + .unwrap(); + let response = rx.recv().await.unwrap().unwrap(); + let subscription_id = match response { + Message::Text(json) => { + let json: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(json["jsonrpc"], "2.0"); + assert_eq!(json["id"], 1); + json["result"]["subscription_id"].as_u64().unwrap() + } + _ => { + panic!("Expected text message"); + } + }; + assert!(rx.is_empty()); + pending_data_tx + .send(sample_block( + BlockNumber::GENESIS, + vec![ + (contract_address!("0x1"), transaction_hash!("0x3")), + (contract_address!("0x2"), transaction_hash!("0x4")), + ], + )) + .unwrap(); + assert_eq!( + recv(&mut rx).await, + sample_message_with_details("0x1", "0x3", subscription_id) + ); + assert_eq!( + recv(&mut rx).await, + sample_message_with_details("0x2", "0x4", subscription_id) + ); + assert!(rx.is_empty()); + } + + #[tokio::test] + async fn filtering_one_address() { + let Setup { + tx, + mut rx, + pending_data_tx, + } = setup(); + tx.send(Ok(Message::Text( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "starknet_subscribePendingTransactions", + "params": { + "sender_address": ["0x1"] + } + }) + .to_string(), + ))) + .await + .unwrap(); + let response = rx.recv().await.unwrap().unwrap(); + let subscription_id = match response { + Message::Text(json) => { + let json: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(json["jsonrpc"], "2.0"); + assert_eq!(json["id"], 1); + json["result"]["subscription_id"].as_u64().unwrap() + } + _ => { + panic!("Expected text message"); + } + }; + assert!(rx.is_empty()); + pending_data_tx + .send(sample_block( + BlockNumber::GENESIS, + vec![ + (contract_address!("0x1"), transaction_hash!("0x1")), + (contract_address!("0x2"), transaction_hash!("0x2")), + ], + )) + .unwrap(); + assert_eq!( + recv(&mut rx).await, + sample_message_no_details("0x1", subscription_id) + ); + assert!(rx.is_empty()); + } + + #[tokio::test] + async fn filtering_two_addresses() { + let Setup { + tx, + mut rx, + pending_data_tx, + } = setup(); + tx.send(Ok(Message::Text( + serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "starknet_subscribePendingTransactions", + "params": { + "sender_address": ["0x1", "0x2"] + } + }) + .to_string(), + ))) + .await + .unwrap(); + let response = rx.recv().await.unwrap().unwrap(); + let subscription_id = match response { + Message::Text(json) => { + let json: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(json["jsonrpc"], "2.0"); + assert_eq!(json["id"], 1); + json["result"]["subscription_id"].as_u64().unwrap() + } + _ => { + panic!("Expected text message"); + } + }; + assert!(rx.is_empty()); + pending_data_tx + .send(sample_block( + BlockNumber::GENESIS, + vec![ + (contract_address!("0x1"), transaction_hash!("0x3")), + (contract_address!("0x2"), transaction_hash!("0x4")), + (contract_address!("0x3"), transaction_hash!("0x5")), + ], + )) + .unwrap(); + assert_eq!( + recv(&mut rx).await, + sample_message_no_details("0x3", subscription_id) + ); + assert_eq!( + recv(&mut rx).await, + sample_message_no_details("0x4", subscription_id) + ); + assert!(rx.is_empty()); + } + + async fn recv(rx: &mut mpsc::Receiver>) -> serde_json::Value { + let res = rx.recv().await.unwrap().unwrap(); + match res { + Message::Text(json) => serde_json::from_str(&json).unwrap(), + _ => panic!("Expected text message"), + } + } + + fn sample_block( + block_number: BlockNumber, + txs: Vec<(ContractAddress, TransactionHash)>, + ) -> PendingData { + PendingData { + block: PendingBlock { + transactions: txs + .into_iter() + .map(|(sender_address, hash)| Transaction { + variant: TransactionVariant::DeclareV0(DeclareTransactionV0V1 { + sender_address, + ..Default::default() + }), + hash, + }) + .collect(), + ..Default::default() + } + .into(), + number: block_number, + ..Default::default() + } + } + + fn sample_message_no_details(hash: &str, subscription_id: u64) -> serde_json::Value { + serde_json::json!({ + "jsonrpc":"2.0", + "method":"starknet_subscriptionPendingTransactions", + "params": { + "result": hash, + "subscription_id": subscription_id + } + }) + } + + fn sample_message_with_details( + sender_address: &str, + hash: &str, + subscription_id: u64, + ) -> serde_json::Value { + serde_json::json!({ + "jsonrpc":"2.0", + "method":"starknet_subscriptionPendingTransactions", + "params": { + "result": { + "class_hash": "0x0", + "max_fee": "0x0", + "sender_address": sender_address, + "signature": [], + "transaction_hash": hash, + "type": "DECLARE", + "version": "0x0" + }, + "subscription_id": subscription_id + } + }) + } + + fn setup() -> Setup { + let storage = StorageBuilder::in_memory().unwrap(); + let (pending_data_tx, pending_data) = tokio::sync::watch::channel(Default::default()); + let notifications = Notifications::default(); + let ctx = RpcContext { + cache: Default::default(), + storage, + execution_storage: StorageBuilder::in_memory().unwrap(), + pending_data: PendingWatcher::new(pending_data), + sync_status: SyncState { + status: Syncing::False(false).into(), + } + .into(), + chain_id: ChainId::MAINNET, + sequencer: Client::mainnet(Duration::from_secs(10)), + websocket: None, + notifications, + config: RpcConfig { + batch_concurrency_limit: 1.try_into().unwrap(), + get_events_max_blocks_to_scan: 1.try_into().unwrap(), + get_events_max_uncached_bloom_filters_to_load: 1.try_into().unwrap(), + custom_versioned_constants: None, + }, + }; + let router = v08::register_routes().build(ctx); + let (sender_tx, sender_rx) = mpsc::channel(1024); + let (receiver_tx, receiver_rx) = mpsc::channel(1024); + handle_json_rpc_socket(router.clone(), sender_tx, receiver_rx); + Setup { + tx: receiver_tx, + rx: sender_rx, + pending_data_tx, + } + } + + struct Setup { + tx: mpsc::Sender>, + rx: mpsc::Receiver>, + pending_data_tx: watch::Sender, + } +} diff --git a/crates/rpc/src/pending.rs b/crates/rpc/src/pending.rs index 2c5ac75636..2fc503ba8e 100644 --- a/crates/rpc/src/pending.rs +++ b/crates/rpc/src/pending.rs @@ -9,7 +9,7 @@ use tokio::sync::watch::Receiver as WatchReceiver; /// Provides the latest [PendingData] which is consistent with a given /// view of storage. #[derive(Clone)] -pub struct PendingWatcher(WatchReceiver); +pub struct PendingWatcher(pub WatchReceiver); #[derive(Clone, Default, Debug, PartialEq)] pub struct PendingData { diff --git a/crates/rpc/src/v08.rs b/crates/rpc/src/v08.rs index f80a312bc8..def8860c74 100644 --- a/crates/rpc/src/v08.rs +++ b/crates/rpc/src/v08.rs @@ -1,9 +1,11 @@ use crate::jsonrpc::{RpcRouter, RpcRouterBuilder}; use crate::method::subscribe_new_heads::SubscribeNewHeads; +use crate::method::subscribe_pending_transactions::SubscribePendingTransactions; #[rustfmt::skip] pub fn register_routes() -> RpcRouterBuilder { RpcRouter::builder(crate::RpcVersion::V08) - .register("starknet_subscribeNewHeads", SubscribeNewHeads) - .register("starknet_specVersion", || "0.8.0") + .register("starknet_subscribeNewHeads", SubscribeNewHeads) + .register("starknet_subscribePendingTransactions", SubscribePendingTransactions) + .register("starknet_specVersion", || "0.8.0") } From 0cd34b1d0f0c60040616c956d7080b2f9b2e3b14 Mon Sep 17 00:00:00 2001 From: sistemd Date: Wed, 25 Sep 2024 14:44:01 +0200 Subject: [PATCH 2/2] fix version --- crates/rpc/src/v08.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rpc/src/v08.rs b/crates/rpc/src/v08.rs index def8860c74..7cb46f5a94 100644 --- a/crates/rpc/src/v08.rs +++ b/crates/rpc/src/v08.rs @@ -7,5 +7,5 @@ pub fn register_routes() -> RpcRouterBuilder { RpcRouter::builder(crate::RpcVersion::V08) .register("starknet_subscribeNewHeads", SubscribeNewHeads) .register("starknet_subscribePendingTransactions", SubscribePendingTransactions) - .register("starknet_specVersion", || "0.8.0") + .register("starknet_specVersion", || "0.8.0-rc0") }