From 31a057456d4310dbecb473799b9c72cb66ef56c7 Mon Sep 17 00:00:00 2001 From: Lu Zhang <8418040+longbowlu@users.noreply.github.com> Date: Tue, 13 Feb 2024 17:20:31 -0800 Subject: [PATCH] [bridge 39/n] move code in main to node.rs (#16123) ## Description as title. So that we can start node in memory for testing. ## Test Plan How did you test the new or updated feature? --- If your changes are not user-facing and do not break anything, you can skip the following section. Otherwise, please briefly describe what has changed under the Release Notes section. ### Type of Change (Check all that apply) - [ ] protocol change - [ ] user-visible impact - [ ] breaking change for a client SDKs - [ ] breaking change for FNs (FN binary must upgrade) - [ ] breaking change for validators or node operators (must upgrade binaries) - [ ] breaking change for on-chain data layout - [ ] necessitate either a data wipe or data migration ### Release notes --- crates/sui-bridge/src/lib.rs | 1 + crates/sui-bridge/src/main.rs | 154 +------------------------------- crates/sui-bridge/src/node.rs | 162 ++++++++++++++++++++++++++++++++++ 3 files changed, 166 insertions(+), 151 deletions(-) create mode 100644 crates/sui-bridge/src/node.rs diff --git a/crates/sui-bridge/src/lib.rs b/crates/sui-bridge/src/lib.rs index edb0210ba720d..3f57af34d49f7 100644 --- a/crates/sui-bridge/src/lib.rs +++ b/crates/sui-bridge/src/lib.rs @@ -10,6 +10,7 @@ pub mod error; pub mod eth_client; pub mod eth_syncer; pub mod events; +pub mod node; pub mod orchestrator; pub mod server; pub mod storage; diff --git a/crates/sui-bridge/src/main.rs b/crates/sui-bridge/src/main.rs index f458c50950a1f..ed25477bde4d4 100644 --- a/crates/sui-bridge/src/main.rs +++ b/crates/sui-bridge/src/main.rs @@ -4,24 +4,12 @@ use clap::Parser; use mysten_metrics::start_prometheus_server; use std::{ - collections::HashMap, net::{IpAddr, Ipv4Addr, SocketAddr}, path::PathBuf, - sync::Arc, - time::Duration, -}; -use sui_bridge::{ - action_executor::BridgeActionExecutor, - client::bridge_authority_aggregator::BridgeAuthorityAggregator, - config::{BridgeClientConfig, BridgeNodeConfig}, - eth_syncer::EthSyncer, - orchestrator::BridgeOrchestrator, - server::{handler::BridgeRequestHandler, run_server}, - storage::BridgeOrchestratorTables, - sui_syncer::SuiSyncer, }; +use sui_bridge::config::BridgeNodeConfig; +use sui_bridge::node::run_bridge_node; use sui_config::Config; -use tokio::task::JoinHandle; use tracing::info; // TODO consolidate this with sui-node/src/main.rs, but where to put it? @@ -70,141 +58,5 @@ async fn main() -> anyhow::Result<()> { .with_prom_registry(&prometheus_registry) .init(); - let (server_config, client_config) = config.validate().await?; - - // Start Client - let _handles = if let Some(client_config) = client_config { - start_client_components(client_config).await - } else { - Ok(vec![]) - }?; - - // Start Server - let socket_address = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), - server_config.server_listen_port, - ); - run_server( - &socket_address, - BridgeRequestHandler::new( - server_config.key, - server_config.sui_client, - server_config.eth_client, - server_config.approved_governance_actions, - ), - ) - .await; - Ok(()) -} - -// TODO: is there a way to clean up the overrides after it's stored in DB? -async fn start_client_components( - client_config: BridgeClientConfig, -) -> anyhow::Result>> { - let store: std::sync::Arc = - BridgeOrchestratorTables::new(&client_config.db_path.join("client")); - let stored_module_cursors = store - .get_sui_event_cursors(&client_config.sui_bridge_modules) - .map_err(|e| anyhow::anyhow!("Unable to get sui event cursors from storage: {e:?}"))?; - let mut sui_modules_to_watch = HashMap::new(); - for (module, cursor) in client_config - .sui_bridge_modules - .iter() - .zip(stored_module_cursors) - { - if client_config - .sui_bridge_modules_last_processed_event_id_override - .contains_key(module) - { - sui_modules_to_watch.insert( - module.clone(), - client_config.sui_bridge_modules_last_processed_event_id_override[module], - ); - info!( - "Overriding cursor for sui bridge module {} to {:?}. Stored cursor: {:?}", - module, - client_config.sui_bridge_modules_last_processed_event_id_override[module], - cursor - ); - } else if let Some(cursor) = cursor { - sui_modules_to_watch.insert(module.clone(), cursor); - } else { - return Err(anyhow::anyhow!( - "No cursor found for sui bridge module {} in storage or config override", - module - )); - } - } - - let stored_eth_cursors = store - .get_eth_event_cursors(&client_config.eth_bridge_contracts) - .map_err(|e| anyhow::anyhow!("Unable to get eth event cursors from storage: {e:?}"))?; - let mut eth_contracts_to_watch = HashMap::new(); - for (contract, cursor) in client_config - .eth_bridge_contracts - .iter() - .zip(stored_eth_cursors) - { - if client_config - .eth_bridge_contracts_start_block_override - .contains_key(contract) - { - eth_contracts_to_watch.insert( - *contract, - client_config.eth_bridge_contracts_start_block_override[contract], - ); - info!( - "Overriding cursor for eth bridge contract {} to {}. Stored cursor: {:?}", - contract, client_config.eth_bridge_contracts_start_block_override[contract], cursor - ); - } else if let Some(cursor) = cursor { - // +1: The stored value is the last block that was processed, so we start from the next block. - eth_contracts_to_watch.insert(*contract, cursor + 1); - } else { - return Err(anyhow::anyhow!( - "No cursor found for eth contract {} in storage or config override", - contract - )); - } - } - - let sui_client = client_config.sui_client.clone(); - - let mut all_handles = vec![]; - let (task_handles, eth_events_rx, _) = - EthSyncer::new(client_config.eth_client.clone(), eth_contracts_to_watch) - .run() - .await - .expect("Failed to start eth syncer"); - all_handles.extend(task_handles); - - let (task_handles, sui_events_rx) = - SuiSyncer::new(client_config.sui_client, sui_modules_to_watch) - .run(Duration::from_secs(2)) - .await - .expect("Failed to start sui syncer"); - all_handles.extend(task_handles); - - let committee = Arc::new( - sui_client - .get_bridge_committee() - .await - .expect("Failed to get committee"), - ); - let bridge_auth_agg = BridgeAuthorityAggregator::new(committee); - - let bridge_action_executor = BridgeActionExecutor::new( - sui_client.clone(), - Arc::new(bridge_auth_agg), - store.clone(), - client_config.key, - client_config.sui_address, - client_config.gas_object_ref.0, - ); - - let orchestrator = - BridgeOrchestrator::new(sui_client, sui_events_rx, eth_events_rx, store.clone()); - - all_handles.extend(orchestrator.run(bridge_action_executor)); - Ok(all_handles) + run_bridge_node(config).await } diff --git a/crates/sui-bridge/src/node.rs b/crates/sui-bridge/src/node.rs new file mode 100644 index 0000000000000..1179dde2504cf --- /dev/null +++ b/crates/sui-bridge/src/node.rs @@ -0,0 +1,162 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + action_executor::BridgeActionExecutor, + client::bridge_authority_aggregator::BridgeAuthorityAggregator, + config::{BridgeClientConfig, BridgeNodeConfig}, + eth_syncer::EthSyncer, + orchestrator::BridgeOrchestrator, + server::{handler::BridgeRequestHandler, run_server}, + storage::BridgeOrchestratorTables, + sui_syncer::SuiSyncer, +}; +use std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr, SocketAddr}, + sync::Arc, + time::Duration, +}; +use tokio::task::JoinHandle; +use tracing::info; + +pub async fn run_bridge_node(config: BridgeNodeConfig) -> anyhow::Result<()> { + let (server_config, client_config) = config.validate().await?; + + // Start Client + let _handles = if let Some(client_config) = client_config { + start_client_components(client_config).await + } else { + Ok(vec![]) + }?; + + // Start Server + let socket_address = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + server_config.server_listen_port, + ); + run_server( + &socket_address, + BridgeRequestHandler::new( + server_config.key, + server_config.sui_client, + server_config.eth_client, + server_config.approved_governance_actions, + ), + ) + .await; + + Ok(()) +} + +// TODO: is there a way to clean up the overrides after it's stored in DB? +async fn start_client_components( + client_config: BridgeClientConfig, +) -> anyhow::Result>> { + let store: std::sync::Arc = + BridgeOrchestratorTables::new(&client_config.db_path.join("client")); + let stored_module_cursors = store + .get_sui_event_cursors(&client_config.sui_bridge_modules) + .map_err(|e| anyhow::anyhow!("Unable to get sui event cursors from storage: {e:?}"))?; + let mut sui_modules_to_watch = HashMap::new(); + for (module, cursor) in client_config + .sui_bridge_modules + .iter() + .zip(stored_module_cursors) + { + if client_config + .sui_bridge_modules_last_processed_event_id_override + .contains_key(module) + { + sui_modules_to_watch.insert( + module.clone(), + client_config.sui_bridge_modules_last_processed_event_id_override[module], + ); + info!( + "Overriding cursor for sui bridge module {} to {:?}. Stored cursor: {:?}", + module, + client_config.sui_bridge_modules_last_processed_event_id_override[module], + cursor + ); + } else if let Some(cursor) = cursor { + sui_modules_to_watch.insert(module.clone(), cursor); + } else { + return Err(anyhow::anyhow!( + "No cursor found for sui bridge module {} in storage or config override", + module + )); + } + } + + let stored_eth_cursors = store + .get_eth_event_cursors(&client_config.eth_bridge_contracts) + .map_err(|e| anyhow::anyhow!("Unable to get eth event cursors from storage: {e:?}"))?; + let mut eth_contracts_to_watch = HashMap::new(); + for (contract, cursor) in client_config + .eth_bridge_contracts + .iter() + .zip(stored_eth_cursors) + { + if client_config + .eth_bridge_contracts_start_block_override + .contains_key(contract) + { + eth_contracts_to_watch.insert( + *contract, + client_config.eth_bridge_contracts_start_block_override[contract], + ); + info!( + "Overriding cursor for eth bridge contract {} to {}. Stored cursor: {:?}", + contract, client_config.eth_bridge_contracts_start_block_override[contract], cursor + ); + } else if let Some(cursor) = cursor { + // +1: The stored value is the last block that was processed, so we start from the next block. + eth_contracts_to_watch.insert(*contract, cursor + 1); + } else { + return Err(anyhow::anyhow!( + "No cursor found for eth contract {} in storage or config override", + contract + )); + } + } + + let sui_client = client_config.sui_client.clone(); + + let mut all_handles = vec![]; + let (task_handles, eth_events_rx, _) = + EthSyncer::new(client_config.eth_client.clone(), eth_contracts_to_watch) + .run() + .await + .expect("Failed to start eth syncer"); + all_handles.extend(task_handles); + + let (task_handles, sui_events_rx) = + SuiSyncer::new(client_config.sui_client, sui_modules_to_watch) + .run(Duration::from_secs(2)) + .await + .expect("Failed to start sui syncer"); + all_handles.extend(task_handles); + + let committee = Arc::new( + sui_client + .get_bridge_committee() + .await + .expect("Failed to get committee"), + ); + let bridge_auth_agg = BridgeAuthorityAggregator::new(committee); + + let bridge_action_executor = BridgeActionExecutor::new( + sui_client.clone(), + Arc::new(bridge_auth_agg), + store.clone(), + client_config.key, + client_config.sui_address, + client_config.gas_object_ref.0, + ); + + let orchestrator = + BridgeOrchestrator::new(sui_client, sui_events_rx, eth_events_rx, store.clone()); + + all_handles.extend(orchestrator.run(bridge_action_executor)); + Ok(all_handles) +}