Skip to content

Commit

Permalink
[bridge] some fixes to bridge node (MystenLabs#18322)
Browse files Browse the repository at this point in the history
## Description 

Describe the changes or additions included in this PR.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
longbowlu authored and tx-tomcat committed Jul 29, 2024
1 parent c0d5622 commit 99cf69e
Show file tree
Hide file tree
Showing 21 changed files with 218 additions and 89 deletions.
5 changes: 4 additions & 1 deletion crates/sui-bridge-cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,11 @@ pub enum BridgeCommand {
/// Print current committee info
#[clap(name = "print-bridge-committee-info")]
PrintBridgeCommitteeInfo {
#[clap(long = "sui-rpc-url")]
sui_rpc_url: String,
#[clap(long, default_value = "false")]
hex: bool,
#[clap(long, default_value = "false")]
ping: bool,
},
/// Client to facilitate and execute Bridge actions
Expand Down Expand Up @@ -213,7 +216,7 @@ pub fn make_action(chain_id: BridgeChainId, cmd: &GovernanceClientCommands) -> B
nonce: *nonce,
chain_id,
blocklist_type: *blocklist_type,
blocklisted_members: pubkeys_hex.clone(),
members_to_update: pubkeys_hex.clone(),
}),
GovernanceClientCommands::UpdateLimit {
nonce,
Expand Down
14 changes: 12 additions & 2 deletions crates/sui-bridge-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ async fn main() -> anyhow::Result<()> {
let eth_action = make_action(chain_id, &cmd);
println!("Action to execute on Eth: {:?}", eth_action);
// Create Eth Signer Client
// TODO if a validator is blocklisted on eth, ignore their signatures?
let certified_action = agg
.request_committee_signatures(eth_action)
.await
Expand Down Expand Up @@ -275,7 +276,11 @@ async fn main() -> anyhow::Result<()> {
}
}

BridgeCommand::PrintBridgeCommitteeInfo { sui_rpc_url, ping } => {
BridgeCommand::PrintBridgeCommitteeInfo {
sui_rpc_url,
hex,
ping,
} => {
let sui_bridge_client = SuiClient::<SuiSdkClient>::new(&sui_rpc_url).await?;
let bridge_summary = sui_bridge_client
.get_bridge_summary()
Expand Down Expand Up @@ -371,6 +376,11 @@ async fn main() -> anyhow::Result<()> {
for ((name, sui_address, pubkey, eth_address, url, stake, blocklisted), ping_resp) in
authorities.into_iter().zip(ping_tasks_resp)
{
let pubkey = if hex {
Hex::encode(pubkey.as_bytes())
} else {
pubkey.to_string()
};
match ping_resp {
Some(resp) => {
if resp {
Expand All @@ -381,7 +391,7 @@ async fn main() -> anyhow::Result<()> {
name,
sui_address,
eth_address,
Hex::encode(pubkey.as_bytes()),
pubkey,
url,
stake,
blocklisted,
Expand Down
32 changes: 12 additions & 20 deletions crates/sui-bridge-indexer/src/eth_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use ethers::providers::{Http, Middleware};
use ethers::types::Address as EthAddress;
use mysten_metrics::spawn_logged_monitored_task;
use std::collections::HashMap;
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use sui_bridge::abi::{EthBridgeEvent, EthSuiBridgeEvents};
Expand Down Expand Up @@ -54,16 +53,10 @@ impl EthBridgeWorker {
})
}

pub async fn start_indexing_finalized_events(&self) -> Result<JoinHandle<()>> {
let eth_client = Arc::new(
EthClient::<ethers::providers::Http>::new(
&self.config.eth_rpc_url,
HashSet::from_iter(vec![self.bridge_address]),
)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?,
);

pub async fn start_indexing_finalized_events(
&self,
eth_client: Arc<EthClient<ethers::providers::Http>>,
) -> Result<JoinHandle<()>> {
let newest_finalized_block = match get_latest_eth_token_transfer(&self.pg_pool, true)? {
Some(transfer) => transfer.block_height as u64,
None => self.config.start_block,
Expand Down Expand Up @@ -96,15 +89,10 @@ impl EthBridgeWorker {
))
}

pub async fn start_indexing_unfinalized_events(&self) -> Result<JoinHandle<()>> {
let eth_client = Arc::new(
EthClient::<ethers::providers::Http>::new(
&self.config.eth_rpc_url,
HashSet::from_iter(vec![self.bridge_address]),
)
.await?,
);

pub async fn start_indexing_unfinalized_events(
&self,
eth_client: Arc<EthClient<ethers::providers::Http>>,
) -> Result<JoinHandle<()>> {
let newest_unfinalized_block_recorded =
match get_latest_eth_token_transfer(&self.pg_pool, false)? {
Some(transfer) => transfer.block_height as u64,
Expand Down Expand Up @@ -145,6 +133,10 @@ impl EthBridgeWorker {
"unfinalized indexer handler"
))
}

pub fn bridge_address(&self) -> EthAddress {
self.bridge_address
}
}

async fn process_eth_events(
Expand Down
26 changes: 20 additions & 6 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use anyhow::Result;
use clap::*;
use mysten_metrics::start_prometheus_server;
use prometheus::Registry;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::env;
use std::path::PathBuf;
use std::sync::Arc;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metrics::BridgeMetrics;
use sui_bridge_indexer::eth_worker::EthBridgeWorker;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, PgProgressStore};
use sui_bridge_indexer::sui_worker::SuiBridgeWorker;
Expand Down Expand Up @@ -60,20 +63,31 @@ async fn main() -> Result<()> {
);
let indexer_meterics = BridgeIndexerMetrics::new(&registry);
let ingestion_metrics = DataIngestionMetrics::new(&registry);
let bridge_metrics = Arc::new(BridgeMetrics::new(&registry));

// unwrap safe: db_url must be set in `load_config` above
let db_url = config.db_url.clone().unwrap();

// TODO: retry_with_max_elapsed_time
let eth_worker = EthBridgeWorker::new(
get_connection_pool(db_url.clone()),
indexer_meterics.clone(),
config,
config.clone(),
)
.unwrap();

let eth_client = Arc::new(
EthClient::<ethers::providers::Http>::new(
&config.eth_rpc_url,
HashSet::from_iter(vec![eth_worker.bridge_address()]),
bridge_metrics,
)
.await
.map_err(|e| anyhow::anyhow!(e.to_string()))?,
);

// TODO: retry_with_max_elapsed_time
let eth_worker_binding = eth_worker.unwrap();
let unfinalized_handle = eth_worker_binding.start_indexing_unfinalized_events();
let finalized_handle = eth_worker_binding.start_indexing_finalized_events();
let unfinalized_handle = eth_worker.start_indexing_unfinalized_events(eth_client.clone());
let finalized_handle = eth_worker.start_indexing_finalized_events(eth_client.clone());

// TODO: add retry_with_max_elapsed_time
let progress = start_processing_sui_checkpoints(
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-bridge/src/abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ mod tests {
nonce: 0,
chain_id: BridgeChainId::EthSepolia,
blocklist_type: BlocklistType::Blocklist,
blocklisted_members: vec![pub_key_bytes],
members_to_update: vec![pub_key_bytes],
};
let message: eth_bridge_committee::Message = action.into();
assert_eq!(
Expand Down
86 changes: 69 additions & 17 deletions crates/sui-bridge/src/action_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//! BridgeActionExecutor receives BridgeActions (from BridgeOrchestrator),
//! collects bridge authority signatures and submit signatures on chain.

use crate::retry_with_max_elapsed_time;
use arc_swap::ArcSwap;
use mysten_metrics::spawn_logged_monitored_task;
use shared_crypto::intent::{Intent, IntentMessage};
Expand Down Expand Up @@ -36,9 +37,12 @@ use crate::{
};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::time::Duration;
use tracing::{error, info, instrument, warn, Instrument};

pub const CHANNEL_SIZE: usize = 1000;
pub const SIGNING_CONCURRENCY: usize = 10;

// delay schedule: at most 16 times including the initial attempt
// 0.1s, 0.2s, 0.4s, 0.8s, 1.6s, 3.2s, 6.4s, 12.8s, 25.6s, 51.2s, 102.4s, 204.8s, 409.6s, 819.2s, 1638.4s
Expand Down Expand Up @@ -197,8 +201,10 @@ where
metrics: Arc<BridgeMetrics>,
) {
info!("Starting run_signature_aggregation_loop");
let semaphore = Arc::new(Semaphore::new(SIGNING_CONCURRENCY));
while let Some(action) = signing_queue_receiver.recv().await {
Self::handle_signing_task(
&semaphore,
&auth_agg,
&signing_queue_sender,
&execution_queue_sender,
Expand All @@ -211,8 +217,19 @@ where
}
}

async fn should_proceed_signing(sui_client: &Arc<SuiClient<C>>) -> bool {
let Ok(Ok(is_paused)) =
retry_with_max_elapsed_time!(sui_client.is_bridge_paused(), Duration::from_secs(600))
else {
error!("Failed to get bridge status after retry");
return false;
};
!is_paused
}

#[instrument(level = "error", skip_all, fields(action_key=?action.0.key(), attempt_times=?action.1))]
async fn handle_signing_task(
semaphore: &Arc<Semaphore>,
auth_agg: &Arc<BridgeAuthorityAggregator>,
signing_queue_sender: &mysten_metrics::metered_channel::Sender<
BridgeActionExecutionWrapper,
Expand All @@ -229,22 +246,38 @@ where
let action_key = action.0.key();
info!("Received action for signing: {:?}", action.0);

// TODO: this is a temporary fix to avoid signing when the bridge is paused.
// but the way is implemented is not ideal:
// 1. it should check the direction
// 2. should use a better mechanism to check the bridge status instead of polling for each action
let should_proceed = Self::should_proceed_signing(sui_client).await;
if !should_proceed {
metrics.action_executor_signing_queue_skipped_actions.inc();
warn!("skipping signing task: {:?}", action_key);
return;
}

let auth_agg_clone = auth_agg.clone();
let signing_queue_sender_clone = signing_queue_sender.clone();
let execution_queue_sender_clone = execution_queue_sender.clone();
let sui_client_clone = sui_client.clone();
let store_clone = store.clone();
let metrics_clone = metrics.clone();
spawn_logged_monitored_task!(Self::request_signature(
sui_client_clone,
auth_agg_clone,
action,
store_clone,
signing_queue_sender_clone,
execution_queue_sender_clone,
metrics_clone,
)
.instrument(tracing::debug_span!("request_signature", action_key=?action_key)));
let semaphore_clone = semaphore.clone();
spawn_logged_monitored_task!(
Self::request_signatures(
semaphore_clone,
sui_client_clone,
auth_agg_clone,
action,
store_clone,
signing_queue_sender_clone,
execution_queue_sender_clone,
metrics_clone,
)
.instrument(tracing::debug_span!("request_signatures", action_key=?action_key)),
"request_signatures"
);
}

// Checks if the action is already processed on chain.
Expand All @@ -254,6 +287,7 @@ where
sui_client: &Arc<SuiClient<C>>,
action: &BridgeAction,
store: &Arc<BridgeOrchestratorTables>,
metrics: &Arc<BridgeMetrics>,
) -> bool {
let status = sui_client
.get_token_transfer_action_onchain_status_until_success(
Expand All @@ -267,6 +301,7 @@ where
"Action already approved or claimed, removing action from pending logs: {:?}",
action
);
metrics.action_executor_already_processed_actions.inc();
store
.remove_pending_actions(&[action.digest()])
.unwrap_or_else(|e| {
Expand All @@ -280,7 +315,10 @@ where
}
}

async fn request_signature(
// TODO: introduce a way to properly stagger the handling
// for various validators.
async fn request_signatures(
semaphore: Arc<Semaphore>,
sui_client: Arc<SuiClient<C>>,
auth_agg: Arc<BridgeAuthorityAggregator>,
action: BridgeActionExecutionWrapper,
Expand All @@ -291,6 +329,11 @@ where
>,
metrics: Arc<BridgeMetrics>,
) {
let _permit = semaphore
.acquire()
.await
.expect("semaphore should not be closed");
info!("requesting signatures");
let BridgeActionExecutionWrapper(action, attempt_times) = action;

// Only token transfer action should reach here
Expand All @@ -300,8 +343,13 @@ where
};

// If the action is already processed, skip it.
if Self::handle_already_processed_token_transfer_action_maybe(&sui_client, &action, &store)
.await
if Self::handle_already_processed_token_transfer_action_maybe(
&sui_client,
&action,
&store,
&metrics,
)
.await
{
return;
}
Expand Down Expand Up @@ -427,8 +475,10 @@ where
let ceriticate_clone = certificate.clone();

// Check once: if the action is already processed, skip it.
if Self::handle_already_processed_token_transfer_action_maybe(sui_client, action, store)
.await
if Self::handle_already_processed_token_transfer_action_maybe(
sui_client, action, store, metrics,
)
.await
{
info!("Action already processed, skipping");
return;
Expand Down Expand Up @@ -464,8 +514,10 @@ where
let tx_digest = *signed_tx.digest();

// Check twice: If the action is already processed, skip it.
if Self::handle_already_processed_token_transfer_action_maybe(sui_client, action, store)
.await
if Self::handle_already_processed_token_transfer_action_maybe(
sui_client, action, store, metrics,
)
.await
{
info!("Action already processed, skipping");
return;
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-bridge/src/client/bridge_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl BridgeClient {
let nonce = a.nonce.to_string();
let type_ = (a.blocklist_type as u8).to_string();
let keys = a
.blocklisted_members
.members_to_update
.iter()
.map(|k| Hex::encode(k.as_bytes()))
.collect::<Vec<_>>()
Expand Down Expand Up @@ -485,7 +485,7 @@ mod tests {
chain_id: BridgeChainId::EthSepolia,
nonce: 1,
blocklist_type: crate::types::BlocklistType::Blocklist,
blocklisted_members: vec![pub_key_bytes.clone()],
members_to_update: vec![pub_key_bytes.clone()],
});
assert_eq!(
BridgeClient::bridge_action_to_path(&action),
Expand All @@ -501,7 +501,7 @@ mod tests {
chain_id: BridgeChainId::EthSepolia,
nonce: 1,
blocklist_type: crate::types::BlocklistType::Blocklist,
blocklisted_members: vec![pub_key_bytes.clone(), pub_key_bytes2.clone()],
members_to_update: vec![pub_key_bytes.clone(), pub_key_bytes2.clone()],
});
assert_eq!(
BridgeClient::bridge_action_to_path(&action),
Expand Down
Loading

0 comments on commit 99cf69e

Please sign in to comment.