diff --git a/crates/sui-bridge-cli/src/lib.rs b/crates/sui-bridge-cli/src/lib.rs index a08b5130905735..77981277d28017 100644 --- a/crates/sui-bridge-cli/src/lib.rs +++ b/crates/sui-bridge-cli/src/lib.rs @@ -102,8 +102,11 @@ pub enum BridgeCommand { /// Print current committee info #[clap(name = "print-bridge-committee-info")] PrintBridgeCommitteeInfo { + #[clap(long = "sui-rpc-url")] sui_rpc_url: String, #[clap(long, default_value = "false")] + hex: bool, + #[clap(long, default_value = "false")] ping: bool, }, /// Client to facilitate and execute Bridge actions @@ -213,7 +216,7 @@ pub fn make_action(chain_id: BridgeChainId, cmd: &GovernanceClientCommands) -> B nonce: *nonce, chain_id, blocklist_type: *blocklist_type, - blocklisted_members: pubkeys_hex.clone(), + members_to_update: pubkeys_hex.clone(), }), GovernanceClientCommands::UpdateLimit { nonce, diff --git a/crates/sui-bridge-cli/src/main.rs b/crates/sui-bridge-cli/src/main.rs index cee88ae8a2c6d5..5bd23a679a84c8 100644 --- a/crates/sui-bridge-cli/src/main.rs +++ b/crates/sui-bridge-cli/src/main.rs @@ -151,6 +151,7 @@ async fn main() -> anyhow::Result<()> { let eth_action = make_action(chain_id, &cmd); println!("Action to execute on Eth: {:?}", eth_action); // Create Eth Signer Client + // TODO if a validator is blocklisted on eth, ignore their signatures? let certified_action = agg .request_committee_signatures(eth_action) .await @@ -275,7 +276,11 @@ async fn main() -> anyhow::Result<()> { } } - BridgeCommand::PrintBridgeCommitteeInfo { sui_rpc_url, ping } => { + BridgeCommand::PrintBridgeCommitteeInfo { + sui_rpc_url, + hex, + ping, + } => { let sui_bridge_client = SuiClient::::new(&sui_rpc_url).await?; let bridge_summary = sui_bridge_client .get_bridge_summary() @@ -371,6 +376,11 @@ async fn main() -> anyhow::Result<()> { for ((name, sui_address, pubkey, eth_address, url, stake, blocklisted), ping_resp) in authorities.into_iter().zip(ping_tasks_resp) { + let pubkey = if hex { + Hex::encode(pubkey.as_bytes()) + } else { + pubkey.to_string() + }; match ping_resp { Some(resp) => { if resp { @@ -381,7 +391,7 @@ async fn main() -> anyhow::Result<()> { name, sui_address, eth_address, - Hex::encode(pubkey.as_bytes()), + pubkey, url, stake, blocklisted, diff --git a/crates/sui-bridge-indexer/src/eth_worker.rs b/crates/sui-bridge-indexer/src/eth_worker.rs index 17647e99996025..9f68114ba4a12b 100644 --- a/crates/sui-bridge-indexer/src/eth_worker.rs +++ b/crates/sui-bridge-indexer/src/eth_worker.rs @@ -13,7 +13,6 @@ use ethers::providers::{Http, Middleware}; use ethers::types::Address as EthAddress; use mysten_metrics::spawn_logged_monitored_task; use std::collections::HashMap; -use std::collections::HashSet; use std::str::FromStr; use std::sync::Arc; use sui_bridge::abi::{EthBridgeEvent, EthSuiBridgeEvents}; @@ -54,16 +53,10 @@ impl EthBridgeWorker { }) } - pub async fn start_indexing_finalized_events(&self) -> Result> { - let eth_client = Arc::new( - EthClient::::new( - &self.config.eth_rpc_url, - HashSet::from_iter(vec![self.bridge_address]), - ) - .await - .map_err(|e| anyhow::anyhow!(e.to_string()))?, - ); - + pub async fn start_indexing_finalized_events( + &self, + eth_client: Arc>, + ) -> Result> { let newest_finalized_block = match get_latest_eth_token_transfer(&self.pg_pool, true)? { Some(transfer) => transfer.block_height as u64, None => self.config.start_block, @@ -96,15 +89,10 @@ impl EthBridgeWorker { )) } - pub async fn start_indexing_unfinalized_events(&self) -> Result> { - let eth_client = Arc::new( - EthClient::::new( - &self.config.eth_rpc_url, - HashSet::from_iter(vec![self.bridge_address]), - ) - .await?, - ); - + pub async fn start_indexing_unfinalized_events( + &self, + eth_client: Arc>, + ) -> Result> { let newest_unfinalized_block_recorded = match get_latest_eth_token_transfer(&self.pg_pool, false)? { Some(transfer) => transfer.block_height as u64, @@ -145,6 +133,10 @@ impl EthBridgeWorker { "unfinalized indexer handler" )) } + + pub fn bridge_address(&self) -> EthAddress { + self.bridge_address + } } async fn process_eth_events( diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index af8349fe9a744c..977a69bd583fe1 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -5,9 +5,12 @@ use anyhow::Result; use clap::*; use mysten_metrics::start_prometheus_server; use prometheus::Registry; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::env; use std::path::PathBuf; +use std::sync::Arc; +use sui_bridge::eth_client::EthClient; +use sui_bridge::metrics::BridgeMetrics; use sui_bridge_indexer::eth_worker::EthBridgeWorker; use sui_bridge_indexer::postgres_manager::{get_connection_pool, PgProgressStore}; use sui_bridge_indexer::sui_worker::SuiBridgeWorker; @@ -60,20 +63,31 @@ async fn main() -> Result<()> { ); let indexer_meterics = BridgeIndexerMetrics::new(®istry); let ingestion_metrics = DataIngestionMetrics::new(®istry); + let bridge_metrics = Arc::new(BridgeMetrics::new(®istry)); // unwrap safe: db_url must be set in `load_config` above let db_url = config.db_url.clone().unwrap(); + // TODO: retry_with_max_elapsed_time let eth_worker = EthBridgeWorker::new( get_connection_pool(db_url.clone()), indexer_meterics.clone(), - config, + config.clone(), + ) + .unwrap(); + + let eth_client = Arc::new( + EthClient::::new( + &config.eth_rpc_url, + HashSet::from_iter(vec![eth_worker.bridge_address()]), + bridge_metrics, + ) + .await + .map_err(|e| anyhow::anyhow!(e.to_string()))?, ); - // TODO: retry_with_max_elapsed_time - let eth_worker_binding = eth_worker.unwrap(); - let unfinalized_handle = eth_worker_binding.start_indexing_unfinalized_events(); - let finalized_handle = eth_worker_binding.start_indexing_finalized_events(); + let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone()); + let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone()); // TODO: add retry_with_max_elapsed_time let progress = start_processing_sui_checkpoints( diff --git a/crates/sui-bridge/src/abi.rs b/crates/sui-bridge/src/abi.rs index 38f4c1e477fea0..e60d119877936f 100644 --- a/crates/sui-bridge/src/abi.rs +++ b/crates/sui-bridge/src/abi.rs @@ -332,7 +332,7 @@ mod tests { nonce: 0, chain_id: BridgeChainId::EthSepolia, blocklist_type: BlocklistType::Blocklist, - blocklisted_members: vec![pub_key_bytes], + members_to_update: vec![pub_key_bytes], }; let message: eth_bridge_committee::Message = action.into(); assert_eq!( diff --git a/crates/sui-bridge/src/action_executor.rs b/crates/sui-bridge/src/action_executor.rs index 153548f7d17ef9..e1835feb277091 100644 --- a/crates/sui-bridge/src/action_executor.rs +++ b/crates/sui-bridge/src/action_executor.rs @@ -4,6 +4,7 @@ //! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator), //! collects bridge authority signatures and submit signatures on chain. +use crate::retry_with_max_elapsed_time; use arc_swap::ArcSwap; use mysten_metrics::spawn_logged_monitored_task; use shared_crypto::intent::{Intent, IntentMessage}; @@ -36,9 +37,12 @@ use crate::{ }; use std::collections::HashMap; use std::sync::Arc; +use tokio::sync::Semaphore; +use tokio::time::Duration; use tracing::{error, info, instrument, warn, Instrument}; pub const CHANNEL_SIZE: usize = 1000; +pub const SIGNING_CONCURRENCY: usize = 10; // delay schedule: at most 16 times including the initial attempt // 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s, 25.6s, 51.2s, 102.4s, 204.8s, 409.6s, 819.2s, 1638.4s @@ -197,8 +201,10 @@ where metrics: Arc, ) { info!("Starting run_signature_aggregation_loop"); + let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY)); while let Some(action) = signing_queue_receiver.recv().await { Self::handle_signing_task( + &semaphore, &auth_agg, &signing_queue_sender, &execution_queue_sender, @@ -211,8 +217,19 @@ where } } + async fn should_proceed_signing(sui_client: &Arc>) -> bool { + let Ok(Ok(is_paused)) = + retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600)) + else { + error!("Failed to get bridge status after retry"); + return false; + }; + !is_paused + } + #[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))] async fn handle_signing_task( + semaphore: &Arc, auth_agg: &Arc, signing_queue_sender: &mysten_metrics::metered_channel::Sender< BridgeActionExecutionWrapper, @@ -229,22 +246,38 @@ where let action_key = action.0.key(); info!("Received action for signing: {:?}", action.0); + // TODO: this is a temporary fix to avoid signing when the bridge is paused. + // but the way is implemented is not ideal: + // 1. it should check the direction + // 2. should use a better mechanism to check the bridge status instead of polling for each action + let should_proceed = Self::should_proceed_signing(sui_client).await; + if !should_proceed { + metrics.action_executor_signing_queue_skipped_actions.inc(); + warn!("skipping signing task: {:?}", action_key); + return; + } + let auth_agg_clone = auth_agg.clone(); let signing_queue_sender_clone = signing_queue_sender.clone(); let execution_queue_sender_clone = execution_queue_sender.clone(); let sui_client_clone = sui_client.clone(); let store_clone = store.clone(); let metrics_clone = metrics.clone(); - spawn_logged_monitored_task!(Self::request_signature( - sui_client_clone, - auth_agg_clone, - action, - store_clone, - signing_queue_sender_clone, - execution_queue_sender_clone, - metrics_clone, - ) - .instrument(tracing::debug_span!("request_signature", action_key=?action_key))); + let semaphore_clone = semaphore.clone(); + spawn_logged_monitored_task!( + Self::request_signatures( + semaphore_clone, + sui_client_clone, + auth_agg_clone, + action, + store_clone, + signing_queue_sender_clone, + execution_queue_sender_clone, + metrics_clone, + ) + .instrument(tracing::debug_span!("request_signatures", action_key=?action_key)), + "request_signatures" + ); } // Checks if the action is already processed on chain. @@ -254,6 +287,7 @@ where sui_client: &Arc>, action: &BridgeAction, store: &Arc, + metrics: &Arc, ) -> bool { let status = sui_client .get_token_transfer_action_onchain_status_until_success( @@ -267,6 +301,7 @@ where "Action already approved or claimed, removing action from pending logs: {:?}", action ); + metrics.action_executor_already_processed_actions.inc(); store .remove_pending_actions(&[action.digest()]) .unwrap_or_else(|e| { @@ -280,7 +315,10 @@ where } } - async fn request_signature( + // TODO: introduce a way to properly stagger the handling + // for various validators. + async fn request_signatures( + semaphore: Arc, sui_client: Arc>, auth_agg: Arc, action: BridgeActionExecutionWrapper, @@ -291,6 +329,11 @@ where >, metrics: Arc, ) { + let _permit = semaphore + .acquire() + .await + .expect("semaphore should not be closed"); + info!("requesting signatures"); let BridgeActionExecutionWrapper(action, attempt_times) = action; // Only token transfer action should reach here @@ -300,8 +343,13 @@ where }; // If the action is already processed, skip it. - if Self::handle_already_processed_token_transfer_action_maybe(&sui_client, &action, &store) - .await + if Self::handle_already_processed_token_transfer_action_maybe( + &sui_client, + &action, + &store, + &metrics, + ) + .await { return; } @@ -427,8 +475,10 @@ where let ceriticate_clone = certificate.clone(); // Check once: if the action is already processed, skip it. - if Self::handle_already_processed_token_transfer_action_maybe(sui_client, action, store) - .await + if Self::handle_already_processed_token_transfer_action_maybe( + sui_client, action, store, metrics, + ) + .await { info!("Action already processed, skipping"); return; @@ -464,8 +514,10 @@ where let tx_digest = *signed_tx.digest(); // Check twice: If the action is already processed, skip it. - if Self::handle_already_processed_token_transfer_action_maybe(sui_client, action, store) - .await + if Self::handle_already_processed_token_transfer_action_maybe( + sui_client, action, store, metrics, + ) + .await { info!("Action already processed, skipping"); return; diff --git a/crates/sui-bridge/src/client/bridge_client.rs b/crates/sui-bridge/src/client/bridge_client.rs index c4c088a89622f4..08bbd509846724 100644 --- a/crates/sui-bridge/src/client/bridge_client.rs +++ b/crates/sui-bridge/src/client/bridge_client.rs @@ -66,7 +66,7 @@ impl BridgeClient { let nonce = a.nonce.to_string(); let type_ = (a.blocklist_type as u8).to_string(); let keys = a - .blocklisted_members + .members_to_update .iter() .map(|k| Hex::encode(k.as_bytes())) .collect::>() @@ -485,7 +485,7 @@ mod tests { chain_id: BridgeChainId::EthSepolia, nonce: 1, blocklist_type: crate::types::BlocklistType::Blocklist, - blocklisted_members: vec![pub_key_bytes.clone()], + members_to_update: vec![pub_key_bytes.clone()], }); assert_eq!( BridgeClient::bridge_action_to_path(&action), @@ -501,7 +501,7 @@ mod tests { chain_id: BridgeChainId::EthSepolia, nonce: 1, blocklist_type: crate::types::BlocklistType::Blocklist, - blocklisted_members: vec![pub_key_bytes.clone(), pub_key_bytes2.clone()], + members_to_update: vec![pub_key_bytes.clone(), pub_key_bytes2.clone()], }); assert_eq!( BridgeClient::bridge_action_to_path(&action), diff --git a/crates/sui-bridge/src/config.rs b/crates/sui-bridge/src/config.rs index f8ed6862814672..57d738bf56ddf8 100644 --- a/crates/sui-bridge/src/config.rs +++ b/crates/sui-bridge/src/config.rs @@ -5,6 +5,7 @@ use crate::abi::EthBridgeConfig; use crate::crypto::BridgeAuthorityKeyPair; use crate::error::BridgeError; use crate::eth_client::EthClient; +use crate::metrics::BridgeMetrics; use crate::sui_client::SuiClient; use crate::types::{is_route_valid, BridgeAction}; use crate::utils::get_eth_contract_addresses; @@ -118,6 +119,7 @@ impl Config for BridgeNodeConfig {} impl BridgeNodeConfig { pub async fn validate( &self, + metrics: Arc, ) -> anyhow::Result<(BridgeServerConfig, Option)> { if !is_route_valid( BridgeChainId::try_from(self.sui.sui_bridge_chain_id)?, @@ -148,7 +150,7 @@ impl BridgeNodeConfig { )); } - let (eth_client, eth_contracts) = self.prepare_for_eth().await?; + let (eth_client, eth_contracts) = self.prepare_for_eth(metrics).await?; let bridge_summary = sui_client .get_bridge_summary() .await @@ -218,6 +220,7 @@ impl BridgeNodeConfig { async fn prepare_for_eth( &self, + metrics: Arc, ) -> anyhow::Result<(Arc>, Vec)> { let bridge_proxy_address = EthAddress::from_str(&self.eth.eth_bridge_proxy_address)?; let provider = Arc::new( @@ -274,6 +277,7 @@ impl BridgeNodeConfig { limiter_address, vault_address, ]), + metrics, ) .await?, ); diff --git a/crates/sui-bridge/src/encoding.rs b/crates/sui-bridge/src/encoding.rs index 3e3d38c4496110..5331ca0c70c274 100644 --- a/crates/sui-bridge/src/encoding.rs +++ b/crates/sui-bridge/src/encoding.rs @@ -154,12 +154,12 @@ impl BridgeMessageEncoding for BlocklistCommitteeAction { bytes.push(self.blocklist_type as u8); // Add length of updated members. // Unwrap: It should not overflow given what we have today. - bytes.push(u8::try_from(self.blocklisted_members.len()).unwrap()); + bytes.push(u8::try_from(self.members_to_update.len()).unwrap()); // Add list of updated members // Members are represented as pubkey dervied evm addresses (20 bytes) let members_bytes = self - .blocklisted_members + .members_to_update .iter() .map(|m| m.to_eth_address().to_fixed_bytes().to_vec()) .collect::>(); @@ -554,7 +554,7 @@ mod tests { nonce: 129, chain_id: BridgeChainId::SuiCustom, blocklist_type: BlocklistType::Blocklist, - blocklisted_members: vec![pub_key_bytes.clone()], + members_to_update: vec![pub_key_bytes.clone()], }); let bytes = blocklist_action.to_bytes(); /* @@ -581,7 +581,7 @@ mod tests { nonce: 68, chain_id: BridgeChainId::SuiCustom, blocklist_type: BlocklistType::Unblocklist, - blocklisted_members: vec![pub_key_bytes.clone(), pub_key_bytes_2.clone()], + members_to_update: vec![pub_key_bytes.clone(), pub_key_bytes_2.clone()], }); let bytes = blocklist_action.to_bytes(); /* @@ -603,7 +603,7 @@ mod tests { nonce: 49, chain_id: BridgeChainId::EthCustom, blocklist_type: BlocklistType::Blocklist, - blocklisted_members: vec![pub_key_bytes.clone()], + members_to_update: vec![pub_key_bytes.clone()], }); let bytes = blocklist_action.to_bytes(); /* @@ -624,7 +624,7 @@ mod tests { nonce: 94, chain_id: BridgeChainId::EthSepolia, blocklist_type: BlocklistType::Unblocklist, - blocklisted_members: vec![pub_key_bytes.clone(), pub_key_bytes_2.clone()], + members_to_update: vec![pub_key_bytes.clone(), pub_key_bytes_2.clone()], }); let bytes = blocklist_action.to_bytes(); /* diff --git a/crates/sui-bridge/src/eth_client.rs b/crates/sui-bridge/src/eth_client.rs index 6c35c2b26c750f..391918c8796610 100644 --- a/crates/sui-bridge/src/eth_client.rs +++ b/crates/sui-bridge/src/eth_client.rs @@ -2,14 +2,16 @@ // SPDX-License-Identifier: Apache-2.0 use std::collections::HashSet; +use std::sync::Arc; use crate::abi::EthBridgeEvent; use crate::error::{BridgeError, BridgeResult}; +use crate::metrics::BridgeMetrics; use crate::types::{BridgeAction, EthLog}; use ethers::providers::{Http, JsonRpcClient, Middleware, Provider}; use ethers::types::TxHash; use ethers::types::{Block, Filter}; -use tap::TapFallible; +use tap::{Tap, TapFallible}; #[cfg(test)] use crate::eth_mock_provider::EthMockProvider; @@ -17,17 +19,20 @@ use ethers::types::Address as EthAddress; pub struct EthClient

{ provider: Provider

, contract_addresses: HashSet, + metrics: Arc, } impl EthClient { pub async fn new( provider_url: &str, contract_addresses: HashSet, + metrics: Arc, ) -> anyhow::Result { let provider = Provider::try_from(provider_url)?; let self_ = Self { provider, contract_addresses, + metrics, }; self_.describe().await?; Ok(self_) @@ -41,6 +46,7 @@ impl EthClient { Self { provider, contract_addresses, + metrics: Arc::new(BridgeMetrics::new_for_testing()), } } } @@ -71,11 +77,13 @@ where .provider .get_transaction_receipt(tx_hash) .await + .tap(|_| self.metrics.eth_provider_queries.inc()) .map_err(BridgeError::from)? .ok_or(BridgeError::TxNotFound)?; let receipt_block_num = receipt.block_number.ok_or(BridgeError::ProviderError( "Provider returns log without block_number".into(), ))?; + // TODO: save the latest finalized block id so we don't have to query it every time let last_finalized_block_id = self.get_last_finalized_block_id().await?; if receipt_block_num.as_u64() > last_finalized_block_id { return Err(BridgeError::TxNotFinalized); @@ -107,7 +115,8 @@ where let block: Result>, ethers::prelude::ProviderError> = self.provider .request("eth_getBlockByNumber", ("finalized", false)) - .await; + .await + .tap(|_| self.metrics.eth_provider_queries.inc()); let block = block?.ok_or(BridgeError::TransientProviderError( "Provider fails to return last finalized block".into(), ))?; @@ -133,6 +142,7 @@ where .provider .get_logs(&filter) .await + .tap(|_| self.metrics.eth_provider_queries.inc()) .map_err(BridgeError::from) .tap_err(|e| { tracing::error!( @@ -186,6 +196,7 @@ where .provider .get_transaction_receipt(tx_hash) .await + .tap(|_| self.metrics.eth_provider_queries.inc()) .map_err(BridgeError::from)? .ok_or(BridgeError::ProviderError(format!( "Provide cannot find eth transaction for log: {:?})", diff --git a/crates/sui-bridge/src/eth_syncer.rs b/crates/sui-bridge/src/eth_syncer.rs index dad74cf82b7141..dc73e05809ddde 100644 --- a/crates/sui-bridge/src/eth_syncer.rs +++ b/crates/sui-bridge/src/eth_syncer.rs @@ -21,7 +21,7 @@ use tracing::error; const ETH_LOG_QUERY_MAX_BLOCK_RANGE: u64 = 1000; const ETH_EVENTS_CHANNEL_SIZE: usize = 1000; -const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(2); +const FINALIZED_BLOCK_QUERY_INTERVAL: Duration = Duration::from_secs(5); pub struct EthSyncer

{ eth_client: Arc>, @@ -92,9 +92,10 @@ where interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); loop { interval.tick().await; + // TODO: allow to pass custom initial interval let Ok(Ok(new_value)) = retry_with_max_elapsed_time!( eth_client.get_last_finalized_block_id(), - time::Duration::from_secs(10) + time::Duration::from_secs(600) ) else { error!("Failed to get last finalized block from eth client after retry"); continue; @@ -114,6 +115,7 @@ where } // TODO: define a type for block number for readability + // TODO: add a metrics for current start block async fn run_event_listening_task( contract_address: EthAddress, mut start_block: u64, @@ -149,7 +151,7 @@ where more_blocks = end_block < new_finalized_block; let Ok(Ok(events)) = retry_with_max_elapsed_time!( eth_client.get_events_in_range(contract_address, start_block, end_block), - Duration::from_secs(30) + Duration::from_secs(600) ) else { error!("Failed to get events from eth client after retry"); continue; diff --git a/crates/sui-bridge/src/events.rs b/crates/sui-bridge/src/events.rs index 5ba50f1d598920..1d0a26adaff125 100644 --- a/crates/sui-bridge/src/events.rs +++ b/crates/sui-bridge/src/events.rs @@ -267,7 +267,7 @@ impl TryFrom for EmittedSuiToEthTokenBridgeV1 { fn try_from(event: MoveTokenDepositedEvent) -> BridgeResult { if event.amount_sui_adjusted == 0 { - return Err(BridgeError::Generic(format!( + return Err(BridgeError::ZeroValueBridgeTransfer(format!( "Failed to convert MoveTokenDepositedEvent to EmittedSuiToEthTokenBridgeV1. Manual intervention is required. 0 value transfer should not be allowed in Move: {:?}", event, ))); @@ -516,9 +516,7 @@ pub mod tests { amount_sui_adjusted: 0, }; match EmittedSuiToEthTokenBridgeV1::try_from(emitted_event).unwrap_err() { - BridgeError::Generic(err) => { - assert!(err.contains("0 value transfer should not be allowed in Move")); - } + BridgeError::ZeroValueBridgeTransfer(_) => (), other => panic!("Expected Generic error, got: {:?}", other), } } diff --git a/crates/sui-bridge/src/lib.rs b/crates/sui-bridge/src/lib.rs index bd52a06279e734..bf1b679abb544d 100644 --- a/crates/sui-bridge/src/lib.rs +++ b/crates/sui-bridge/src/lib.rs @@ -41,9 +41,9 @@ pub mod e2e_tests; macro_rules! retry_with_max_elapsed_time { ($func:expr, $max_elapsed_time:expr) => {{ // The following delay sequence (in secs) will be used, applied with jitter - // 0.1, 0.2, 0.4, 0.8, 1.6, 3.2, 6.4, 12.8, 25.6, 30, 30, 30 ... + // 0.4, 0.8, 1.6, 3.2, 6.4, 12.8, 25.6, 30, 30, 30 ... let backoff = backoff::ExponentialBackoff { - initial_interval: Duration::from_millis(100), + initial_interval: Duration::from_millis(400), randomization_factor: 0.1, multiplier: 2.0, max_interval: Duration::from_secs(30), @@ -80,11 +80,13 @@ mod tests { } async fn example_func_err() -> anyhow::Result<()> { + tracing::info!("example_func_err"); Err(anyhow::anyhow!("")) } #[tokio::test] async fn test_retry_with_max_elapsed_time() { + telemetry_subscribers::init_for_testing(); // no retry is needed, should return immediately. We give it a very small // max_elapsed_time and it should still finish in time. let max_elapsed_time = Duration::from_millis(20); @@ -93,7 +95,7 @@ mod tests { .unwrap(); // now call a function that always errors and expect it to return before max_elapsed_time runs out - let max_elapsed_time = Duration::from_secs(4); + let max_elapsed_time = Duration::from_secs(10); let instant = std::time::Instant::now(); retry_with_max_elapsed_time!(example_func_err(), max_elapsed_time).unwrap_err(); assert!(instant.elapsed() < max_elapsed_time); diff --git a/crates/sui-bridge/src/metrics.rs b/crates/sui-bridge/src/metrics.rs index 007a4a7860ff2a..2765b57d026e47 100644 --- a/crates/sui-bridge/src/metrics.rs +++ b/crates/sui-bridge/src/metrics.rs @@ -25,9 +25,13 @@ pub struct BridgeMetrics { pub(crate) eth_watcher_received_events: IntCounter, pub(crate) eth_watcher_received_actions: IntCounter, pub(crate) eth_watcher_unrecognized_events: IntCounter, + pub(crate) action_executor_already_processed_actions: IntCounter, pub(crate) action_executor_signing_queue_received_actions: IntCounter, + pub(crate) action_executor_signing_queue_skipped_actions: IntCounter, pub(crate) action_executor_execution_queue_received_actions: IntCounter, + pub(crate) eth_provider_queries: IntCounter, + pub(crate) gas_coin_balance: IntGauge, } @@ -128,12 +132,24 @@ impl BridgeMetrics { registry, ) .unwrap(), + action_executor_already_processed_actions: register_int_counter_with_registry!( + "bridge_action_executor_already_processed_actions", + "Total number of already processed actions action executor", + registry, + ) + .unwrap(), action_executor_signing_queue_received_actions: register_int_counter_with_registry!( "bridge_action_executor_signing_queue_received_actions", "Total number of received actions in action executor signing queue", registry, ) .unwrap(), + action_executor_signing_queue_skipped_actions: register_int_counter_with_registry!( + "bridge_action_executor_signing_queue_skipped_actions", + "Total number of skipped actions in action executor signing queue", + registry, + ) + .unwrap(), action_executor_execution_queue_received_actions: register_int_counter_with_registry!( "bridge_action_executor_execution_queue_received_actions", "Total number of received actions in action executor execution queue", @@ -146,6 +162,12 @@ impl BridgeMetrics { registry, ) .unwrap(), + eth_provider_queries: register_int_counter_with_registry!( + "bridge_eth_provider_queries", + "Total number of queries issued to eth provider", + registry, + ) + .unwrap(), } } diff --git a/crates/sui-bridge/src/node.rs b/crates/sui-bridge/src/node.rs index e2a98770dbf0d3..3ce44ad64c11ee 100644 --- a/crates/sui-bridge/src/node.rs +++ b/crates/sui-bridge/src/node.rs @@ -35,7 +35,7 @@ pub async fn run_bridge_node( ) -> anyhow::Result> { init_all_struct_tags(); let metrics = Arc::new(BridgeMetrics::new(&prometheus_registry)); - let (server_config, client_config) = config.validate().await?; + let (server_config, client_config) = config.validate(metrics.clone()).await?; // Start Client let _handles = if let Some(client_config) = client_config { diff --git a/crates/sui-bridge/src/orchestrator.rs b/crates/sui-bridge/src/orchestrator.rs index 78128df1c5c6a3..b519b43573ff54 100644 --- a/crates/sui-bridge/src/orchestrator.rs +++ b/crates/sui-bridge/src/orchestrator.rs @@ -10,7 +10,7 @@ use crate::abi::EthBridgeEvent; use crate::action_executor::{ submit_to_executor, BridgeActionExecutionWrapper, BridgeActionExecutorTrait, }; -use crate::error::BridgeResult; +use crate::error::BridgeError; use crate::events::SuiBridgeEvent; use crate::metrics::BridgeMetrics; use crate::storage::BridgeOrchestratorTables; @@ -120,9 +120,23 @@ where .inc_by(events.len() as u64); let bridge_events = events .iter() - .map(SuiBridgeEvent::try_from_sui_event) - .collect::>>() - .expect("Sui Event could not be deserialzed to SuiBridgeEvent"); + .filter_map(|sui_event| { + match SuiBridgeEvent::try_from_sui_event(sui_event) { + Ok(bridge_event) => Some(bridge_event), + // On testnet some early bridge transactions could have zero value (before we disallow it in Move) + Err(BridgeError::ZeroValueBridgeTransfer(_)) => { + error!("Zero value bridge transfer: {:?}", sui_event); + None + } + Err(e) => { + panic!( + "Sui Event could not be deserialzed to SuiBridgeEvent: {:?}", + e + ); + } + } + }) + .collect::>(); let mut actions = vec![]; for (sui_event, opt_bridge_event) in events.iter().zip(bridge_events) { @@ -205,7 +219,7 @@ where continue; } - info!("Received {} Eth events: {:?}", logs.len(), logs); + info!("Received {} Eth events", logs.len()); metrics .eth_watcher_received_events .inc_by(logs.len() as u64); diff --git a/crates/sui-bridge/src/server/mod.rs b/crates/sui-bridge/src/server/mod.rs index 1f5fe70bf52260..6f73472fbe3625 100644 --- a/crates/sui-bridge/src/server/mod.rs +++ b/crates/sui-bridge/src/server/mod.rs @@ -207,7 +207,7 @@ async fn handle_update_committee_blocklist_action( err )) })?; - let blocklisted_members = keys + let members_to_update = keys .split(',') .map(|s| { let bytes = Hex::decode(s).map_err(|e| anyhow::anyhow!("{:?}", e))?; @@ -220,7 +220,7 @@ async fn handle_update_committee_blocklist_action( chain_id, nonce, blocklist_type, - blocklisted_members, + members_to_update, }); let sig: Json = handler.handle_governance_action(action).await?; @@ -626,7 +626,7 @@ mod tests { nonce: 129, chain_id: BridgeChainId::SuiCustom, blocklist_type: BlocklistType::Blocklist, - blocklisted_members: vec![pub_key_bytes.clone()], + members_to_update: vec![pub_key_bytes.clone()], }); client.request_sign_bridge_action(action).await.unwrap(); } diff --git a/crates/sui-bridge/src/sui_client.rs b/crates/sui-bridge/src/sui_client.rs index 6cec2440a50fb9..71012a2de4f385 100644 --- a/crates/sui-bridge/src/sui_client.rs +++ b/crates/sui-bridge/src/sui_client.rs @@ -167,6 +167,12 @@ where .map_err(|e| BridgeError::InternalError(format!("Can't get bridge committee: {e}"))) } + pub async fn is_bridge_paused(&self) -> BridgeResult { + self.get_bridge_summary() + .await + .map(|summary| summary.is_frozen) + } + pub async fn get_treasury_summary(&self) -> BridgeResult { Ok(self.get_bridge_summary().await?.treasury) } diff --git a/crates/sui-bridge/src/sui_syncer.rs b/crates/sui-bridge/src/sui_syncer.rs index 1051f16099a1ec..55ef52847287b3 100644 --- a/crates/sui-bridge/src/sui_syncer.rs +++ b/crates/sui-bridge/src/sui_syncer.rs @@ -93,7 +93,7 @@ where interval.tick().await; let Ok(Ok(events)) = retry_with_max_elapsed_time!( sui_client.query_events_by_module(BRIDGE_PACKAGE_ID, module.clone(), cursor), - Duration::from_secs(10) + Duration::from_secs(120) ) else { tracing::error!("Failed to query events from sui client after retry"); continue; diff --git a/crates/sui-bridge/src/sui_transaction_builder.rs b/crates/sui-bridge/src/sui_transaction_builder.rs index f6cb2d1a3ce9c2..3b1aab01b6f3da 100644 --- a/crates/sui-bridge/src/sui_transaction_builder.rs +++ b/crates/sui-bridge/src/sui_transaction_builder.rs @@ -289,9 +289,9 @@ fn build_committee_blocklist_approve_transaction( let mut builder = ProgrammableTransactionBuilder::new(); - let (source_chain, seq_num, blocklist_type, blocklisted_members) = match bridge_action { + let (source_chain, seq_num, blocklist_type, members_to_update) = match bridge_action { BridgeAction::BlocklistCommitteeAction(a) => { - (a.chain_id, a.nonce, a.blocklist_type, a.blocklisted_members) + (a.chain_id, a.nonce, a.blocklist_type, a.members_to_update) } _ => unreachable!(), }; @@ -300,11 +300,11 @@ fn build_committee_blocklist_approve_transaction( let source_chain = builder.pure(source_chain as u8).unwrap(); let seq_num = builder.pure(seq_num).unwrap(); let blocklist_type = builder.pure(blocklist_type as u8).unwrap(); - let blocklisted_members = blocklisted_members + let members_to_update = members_to_update .into_iter() .map(|m| m.to_eth_address().as_bytes().to_vec()) .collect::>(); - let blocklisted_members = builder.pure(blocklisted_members).unwrap(); + let members_to_update = builder.pure(members_to_update).unwrap(); let arg_bridge = builder.obj(bridge_object_arg).unwrap(); let arg_msg = builder.programmable_move_call( @@ -312,7 +312,7 @@ fn build_committee_blocklist_approve_transaction( ident_str!("message").to_owned(), ident_str!("create_blocklist_message").to_owned(), vec![], - vec![source_chain, seq_num, blocklist_type, blocklisted_members], + vec![source_chain, seq_num, blocklist_type, members_to_update], ); let mut sig_bytes = vec![]; @@ -820,7 +820,7 @@ mod tests { nonce: 0, chain_id: BridgeChainId::SuiCustom, blocklist_type: BlocklistType::Blocklist, - blocklisted_members: vec![BridgeAuthorityPublicKeyBytes::from_bytes( + members_to_update: vec![BridgeAuthorityPublicKeyBytes::from_bytes( &victim.bridge_pubkey_bytes, ) .unwrap()], @@ -849,7 +849,7 @@ mod tests { nonce: 1, chain_id: BridgeChainId::SuiCustom, blocklist_type: BlocklistType::Unblocklist, - blocklisted_members: vec![BridgeAuthorityPublicKeyBytes::from_bytes( + members_to_update: vec![BridgeAuthorityPublicKeyBytes::from_bytes( &victim.bridge_pubkey_bytes, ) .unwrap()], diff --git a/crates/sui-bridge/src/types.rs b/crates/sui-bridge/src/types.rs index 362cf56c8dd41b..07a44d85b6d5b8 100644 --- a/crates/sui-bridge/src/types.rs +++ b/crates/sui-bridge/src/types.rs @@ -244,8 +244,7 @@ pub struct BlocklistCommitteeAction { pub nonce: u64, pub chain_id: BridgeChainId, pub blocklist_type: BlocklistType, - // TODO: rename this to `members_to_update` - pub blocklisted_members: Vec, + pub members_to_update: Vec, } #[derive( @@ -625,7 +624,7 @@ mod tests { nonce: 94, chain_id: BridgeChainId::EthSepolia, blocklist_type: BlocklistType::Unblocklist, - blocklisted_members: vec![], + members_to_update: vec![], }); assert_eq!(action.approval_threshold(), 5001);