From 6fa9effa7b5c597765d734186d7265efa64b0f40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Tr=C4=85d?= Date: Tue, 18 Jun 2024 18:58:15 +0200 Subject: [PATCH] Restrict get_tx by relayer id (#49) --- src/db.rs | 129 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/server.rs | 28 +++++++---- 2 files changed, 143 insertions(+), 14 deletions(-) diff --git a/src/db.rs b/src/db.rs index c28e971..f27db80 100644 --- a/src/db.rs +++ b/src/db.rs @@ -874,7 +874,30 @@ impl Database { } #[instrument(skip(self), level = "debug")] - pub async fn read_txs( + pub async fn read_relayer_tx( + &self, + relayer_id: &str, + tx_id: &str, + ) -> eyre::Result> { + Ok(sqlx::query_as( + r#" + SELECT t.id as tx_id, t.tx_to as to, t.data, t.value, t.gas_limit, t.nonce, + t.blobs, h.tx_hash, s.status + FROM transactions t + LEFT JOIN sent_transactions s ON t.id = s.tx_id + LEFT JOIN tx_hashes h ON s.valid_tx_hash = h.tx_hash + WHERE t.id = $1 + AND t.relayer_id = $2 + "#, + ) + .bind(tx_id) + .bind(relayer_id) + .fetch_optional(&self.pool) + .await?) + } + + #[instrument(skip(self), level = "debug")] + pub async fn read_relayer_txs( &self, relayer_id: &str, tx_status_filter: Option>, @@ -902,6 +925,32 @@ impl Database { .await?) } + #[instrument(skip(self), level = "debug")] + pub async fn read_txs( + &self, + tx_status_filter: Option>, + ) -> eyre::Result> { + let (should_filter, status_filter) = match tx_status_filter { + Some(status) => (true, status), + None => (false, None), + }; + + Ok(sqlx::query_as( + r#" + SELECT t.id as tx_id, t.tx_to as to, t.data, t.value, t.gas_limit, t.nonce, + t. blobs, h.tx_hash, s.status + FROM transactions t + LEFT JOIN sent_transactions s ON t.id = s.tx_id + LEFT JOIN tx_hashes h ON s.valid_tx_hash = h.tx_hash + WHERE ($1 = true AND s.status = $2) OR $1 = false + "#, + ) + .bind(should_filter) + .bind(status_filter) + .fetch_all(&self.pool) + .await?) + } + #[instrument(skip(self), level = "debug")] pub async fn update_relayer_nonce( &self, @@ -1495,7 +1544,7 @@ mod tests { assert_eq!(tx.tx_hash, None); assert_eq!(tx.blobs, None); - let unsent_txs = db.read_txs(relayer_id, None).await?; + let unsent_txs = db.read_relayer_txs(relayer_id, None).await?; assert_eq!(unsent_txs.len(), 1, "1 unsent tx"); let tx_hash_1 = H256::from_low_u64_be(1); @@ -1516,15 +1565,15 @@ mod tests { assert_eq!(tx.tx_hash.unwrap().0, tx_hash_1); assert_eq!(tx.status, Some(TxStatus::Pending)); - let unsent_txs = db.read_txs(relayer_id, Some(None)).await?; + let unsent_txs = db.read_relayer_txs(relayer_id, Some(None)).await?; assert_eq!(unsent_txs.len(), 0, "0 unsent tx"); let pending_txs = db - .read_txs(relayer_id, Some(Some(TxStatus::Pending))) + .read_relayer_txs(relayer_id, Some(Some(TxStatus::Pending))) .await?; assert_eq!(pending_txs.len(), 1, "1 pending tx"); - let all_txs = db.read_txs(relayer_id, None).await?; + let all_txs = db.read_relayer_txs(relayer_id, None).await?; assert_eq!(all_txs, pending_txs); @@ -1610,6 +1659,76 @@ mod tests { Ok(()) } + #[tokio::test] + async fn read_tx() -> eyre::Result<()> { + let (db, _db_container) = setup_db().await?; + + let chain_id = 123; + let network_name = "network_name"; + let http_rpc = "http_rpc"; + let ws_rpc = "ws_rpc"; + + db.upsert_network(chain_id, network_name, http_rpc, ws_rpc) + .await?; + + let relayer_1_id = uuid(); + let relayer_1_id = relayer_1_id.as_str(); + let relayer_1_name = "relayer_1"; + + let relayer_2_id = uuid(); + let relayer_2_id = relayer_2_id.as_str(); + let relayer_2_name = "relayer_2"; + + let key_id = "key_id"; + let relayer_address = Address::from_low_u64_be(1); + + db.create_relayer( + relayer_1_id, + relayer_1_name, + chain_id, + key_id, + relayer_address, + ) + .await?; + + db.create_relayer( + relayer_2_id, + relayer_2_name, + chain_id, + key_id, + relayer_address, + ) + .await?; + + let tx_id = "tx_id"; + let to = Address::from_low_u64_be(1); + let data: &[u8] = &[]; + let value = U256::from(0); + let gas_limit = U256::from(0); + let priority = TransactionPriority::Regular; + let blobs = None; + + db.create_transaction( + tx_id, + to, + data, + value, + gas_limit, + priority, + blobs, + relayer_1_id, + ) + .await?; + + let tx = db.read_tx(tx_id).await?; + assert!(tx.is_some(), "Tx can be read via read_tx"); + + let tx = db.read_relayer_tx(relayer_2_id, tx_id).await?; + assert!(tx.is_none(), "Tx cannot be read by relayer 2"); + + Ok(()) + } + #[tokio::test] async fn blocks() -> eyre::Result<()> { let (db, _db_container) = setup_db().await?; diff --git a/src/server.rs b/src/server.rs index 6adc1d0..aff9efc 100644 --- a/src/server.rs +++ b/src/server.rs @@ -278,12 +278,18 @@ impl RelayerApi { ) -> Result> { api_token.validate(app).await?; - let tx = app.db.read_tx(&tx_id).await?.ok_or_else(|| { - poem::error::Error::from_string( - "Transaction not found".to_string(), - StatusCode::NOT_FOUND, - ) - })?; + let relayer_id = api_token.relayer_id(); + + let tx = app + .db + .read_relayer_tx(relayer_id, &tx_id) + .await? + .ok_or_else(|| { + poem::error::Error::from_string( + "Transaction not found".to_string(), + StatusCode::NOT_FOUND, + ) + })?; let get_tx_response = GetTxResponse { tx_id: tx.tx_id, @@ -318,13 +324,17 @@ impl RelayerApi { api_token.validate(app).await?; let txs = if unsent { - app.db.read_txs(api_token.relayer_id(), Some(None)).await? + app.db + .read_relayer_txs(api_token.relayer_id(), Some(None)) + .await? } else if let Some(status) = status { app.db - .read_txs(api_token.relayer_id(), Some(Some(status))) + .read_relayer_txs(api_token.relayer_id(), Some(Some(status))) .await? } else { - app.db.read_txs(api_token.relayer_id(), None).await? + app.db + .read_relayer_txs(api_token.relayer_id(), None) + .await? }; let txs = txs