diff --git a/Cargo.lock b/Cargo.lock index 93d219c5c66dd..7151a9b199e90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12162,6 +12162,7 @@ dependencies = [ "sui-data-ingestion-core", "sui-test-transaction-builder", "sui-types", + "telemetry-subscribers", "test-cluster", "tokio", "tracing", diff --git a/crates/sui-bridge-indexer/Cargo.toml b/crates/sui-bridge-indexer/Cargo.toml index 66833b7044eea..f589b401e0342 100644 --- a/crates/sui-bridge-indexer/Cargo.toml +++ b/crates/sui-bridge-indexer/Cargo.toml @@ -11,18 +11,19 @@ serde.workspace = true diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] } ethers = "2.0" tokio = { workspace = true, features = ["full"] } -sui-types.workspace = true -prometheus.workspace = true +anyhow.workspace = true async-trait.workspace = true -sui-data-ingestion-core.workspace = true -sui-bridge.workspace = true -clap.workspace = true -tracing.workspace = true +bcs.workspace = true bin-version.workspace = true -anyhow.workspace = true +clap.workspace = true mysten-metrics.workspace = true -bcs.workspace = true +prometheus.workspace = true serde_yaml.workspace = true +sui-bridge.workspace = true +sui-data-ingestion-core.workspace = true +sui-types.workspace = true +telemetry-subscribers.workspace = true +tracing.workspace = true [dev-dependencies] sui-types = { workspace = true, features = ["test-utils"] } diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index 97bee58203d8c..c6fe6c291b177 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -18,6 +18,7 @@ pub struct TokenTransfer { block_height: u64, timestamp_ms: u64, txn_hash: Vec, + txn_sender: Vec, status: TokenTransferStatus, gas_usage: i64, data_source: BridgeDataSource, @@ -40,6 +41,7 @@ impl From for DBTokenTransfer { block_height: value.block_height as i64, timestamp_ms: value.timestamp_ms as i64, txn_hash: value.txn_hash, + txn_sender: value.txn_sender.clone(), status: value.status.to_string(), gas_usage: value.gas_usage, data_source: value.data_source.to_string(), @@ -63,6 +65,9 @@ impl TryFrom<&TokenTransfer> for DBTokenTransferData { .map(|data| DBTokenTransferData { chain_id: value.chain_id as i32, nonce: value.nonce as i64, + block_height: value.block_height as i64, + timestamp_ms: value.timestamp_ms as i64, + txn_hash: value.txn_hash.clone(), sender_address: data.sender_address.clone(), destination_chain: data.destination_chain as i32, recipient_address: data.recipient_address.clone(), diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 81abfaba04d13..5255735966307 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -40,6 +40,10 @@ struct Args { #[tokio::main] async fn main() -> Result<()> { + let _guard = telemetry_subscribers::TelemetryConfig::new() + .with_env() + .init(); + let args = Args::parse(); // load config diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql index fffdaa5e6234b..5f688a55f20b0 100644 --- a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql @@ -2,6 +2,9 @@ CREATE TABLE token_transfer_data ( chain_id INT NOT NULL, nonce BIGINT NOT NULL, + block_height BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + txn_hash bytea NOT NULL, sender_address bytea NOT NULL, destination_chain INT NOT NULL, recipient_address bytea NOT NULL, @@ -9,6 +12,9 @@ CREATE TABLE token_transfer_data amount BIGINT NOT NULL, PRIMARY KEY(chain_id, nonce) ); +CREATE INDEX token_transfer_data_block_height ON token_transfer_data (block_height); +CREATE INDEX token_transfer_data_timestamp_ms ON token_transfer_data (timestamp_ms); +CREATE INDEX token_transfer_data_sender_address ON token_transfer_data (sender_address); CREATE INDEX token_transfer_data_destination_chain ON token_transfer_data (destination_chain); CREATE INDEX token_transfer_data_token_id ON token_transfer_data (token_id); @@ -20,7 +26,10 @@ CREATE TABLE token_transfer block_height BIGINT NOT NULL, timestamp_ms BIGINT NOT NULL, txn_hash bytea NOT NULL, + txn_sender bytea NOT NULL, gas_usage BIGINT NOT NULL, data_source TEXT NOT NULL, PRIMARY KEY(chain_id, nonce, status) -); \ No newline at end of file +); +CREATE INDEX token_transfer_block_height ON token_transfer (block_height); +CREATE INDEX token_transfer_timestamp_ms ON token_transfer (timestamp_ms); diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs index ee0bcc88ab646..04c5c9405b278 100644 --- a/crates/sui-bridge-indexer/src/models.rs +++ b/crates/sui-bridge-indexer/src/models.rs @@ -13,6 +13,7 @@ pub struct TokenTransfer { pub block_height: i64, pub timestamp_ms: i64, pub txn_hash: Vec, + pub txn_sender: Vec, pub gas_usage: i64, pub data_source: String, } @@ -22,6 +23,9 @@ pub struct TokenTransfer { pub struct TokenTransferData { pub chain_id: i32, pub nonce: i64, + pub block_height: i64, + pub timestamp_ms: i64, + pub txn_hash: Vec, pub sender_address: Vec, pub destination_chain: i32, pub recipient_address: Vec, diff --git a/crates/sui-bridge-indexer/src/schema.rs b/crates/sui-bridge-indexer/src/schema.rs index e13e095c22654..791ef52b96868 100644 --- a/crates/sui-bridge-indexer/src/schema.rs +++ b/crates/sui-bridge-indexer/src/schema.rs @@ -11,6 +11,7 @@ diesel::table! { block_height -> Int8, timestamp_ms -> Int8, txn_hash -> Bytea, + txn_sender -> Bytea, gas_usage -> Int8, data_source -> Text, } @@ -20,6 +21,9 @@ diesel::table! { token_transfer_data (chain_id, nonce) { chain_id -> Int4, nonce -> Int8, + block_height -> Int8, + timestamp_ms -> Int8, + txn_hash -> Bytea, sender_address -> Bytea, destination_chain -> Int4, recipient_address -> Bytea, @@ -28,4 +32,4 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!(token_transfer, token_transfer_data,); +diesel::allow_tables_to_appear_in_same_query!(token_transfer, token_transfer_data); diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs index b05bba87ffdda..61b2e1697e8e2 100644 --- a/crates/sui-bridge-indexer/src/worker.rs +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -16,9 +16,9 @@ use sui_bridge::events::{ }; use sui_bridge::types::EthLog; use sui_data_ingestion_core::Worker; -use sui_types::effects::TransactionEffectsAPI; use sui_types::{ base_types::ObjectID, + effects::TransactionEffectsAPI, full_checkpoint_content::{CheckpointData, CheckpointTransaction}, transaction::{TransactionDataAPI, TransactionKind}, BRIDGE_ADDRESS, SUI_BRIDGE_OBJECT_ID, @@ -57,35 +57,36 @@ impl BridgeWorker { // Process a transaction that has been identified as a bridge transaction. fn process_transaction(&self, tx: &CheckpointTransaction, checkpoint: u64, timestamp_ms: u64) { - if let Some(event) = &tx.events { - event.data.iter().for_each(|ev| { + if let Some(events) = &tx.events { + events.data.iter().for_each(|ev| { if ev.type_.address == BRIDGE_ADDRESS { let token_transfer = match ev.type_.name.as_str() { "TokenDepositedEvent" => { - println!("Observed Sui Deposit"); - // todo: handle deserialization error - let event: MoveTokenDepositedEvent = + info!("Observed Sui Deposit {:?}", ev); + // TODO: handle deserialization error + let move_event: MoveTokenDepositedEvent = bcs::from_bytes(&ev.contents).unwrap(); Some(TokenTransfer { - chain_id: event.source_chain, - nonce: event.seq_num, + chain_id: move_event.source_chain, + nonce: move_event.seq_num, block_height: checkpoint, timestamp_ms, txn_hash: tx.transaction.digest().inner().to_vec(), + txn_sender: ev.sender.to_vec(), status: TokenTransferStatus::Deposited, gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), data_source: BridgeDataSource::Sui, data: Some(TokenTransferData { - sender_address: event.sender_address, - destination_chain: event.target_chain, - recipient_address: event.target_address, - token_id: event.token_type, - amount: event.amount_sui_adjusted, + destination_chain: move_event.target_chain, + sender_address: move_event.sender_address.clone(), + recipient_address: move_event.target_address.clone(), + token_id: move_event.token_type, + amount: move_event.amount_sui_adjusted, }), }) } "TokenTransferApproved" => { - println!("Observed Sui Approval"); + info!("Observed Sui Approval {:?}", ev); let event: MoveTokenTransferApproved = bcs::from_bytes(&ev.contents).unwrap(); Some(TokenTransfer { @@ -94,6 +95,7 @@ impl BridgeWorker { block_height: checkpoint, timestamp_ms, txn_hash: tx.transaction.digest().inner().to_vec(), + txn_sender: ev.sender.to_vec(), status: TokenTransferStatus::Approved, gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), data_source: BridgeDataSource::Sui, @@ -101,7 +103,7 @@ impl BridgeWorker { }) } "TokenTransferClaimed" => { - println!("Observed Sui Claim"); + info!("Observed Sui Claim {:?}", ev); let event: MoveTokenTransferClaimed = bcs::from_bytes(&ev.contents).unwrap(); Some(TokenTransfer { @@ -110,6 +112,7 @@ impl BridgeWorker { block_height: checkpoint, timestamp_ms, txn_hash: tx.transaction.digest().inner().to_vec(), + txn_sender: ev.sender.to_vec(), status: TokenTransferStatus::Claimed, gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), data_source: BridgeDataSource::Sui, @@ -120,7 +123,7 @@ impl BridgeWorker { }; if let Some(transfer) = token_transfer { - println!("SUI: Storing bridge event : {:?}", ev.type_); + info!("SUI: Storing bridge event : {:?}", ev.type_); write(&self.pg_pool, transfer); } }; @@ -152,24 +155,25 @@ pub async fn process_eth_transaction( let gas = transaction.gas; let tx_hash = log.tx_hash; - println!("Observed Eth bridge event: {:#?}", bridge_event); + info!("Observed Eth bridge event: {:?}", bridge_event); match bridge_event { EthBridgeEvent::EthSuiBridgeEvents(bridge_event) => match bridge_event { EthSuiBridgeEvents::TokensDepositedFilter(bridge_event) => { - println!("Observed Eth Deposit"); + info!("Observed Eth Deposit {:?}", bridge_event); let transfer = TokenTransfer { chain_id: bridge_event.source_chain_id, nonce: bridge_event.nonce, block_height: block_number, timestamp_ms: timestamp, txn_hash: tx_hash.as_bytes().to_vec(), + txn_sender: bridge_event.sender_address.as_bytes().to_vec(), status: TokenTransferStatus::Deposited, gas_usage: gas.as_u64() as i64, data_source: BridgeDataSource::Eth, data: Some(TokenTransferData { - sender_address: bridge_event.sender_address.as_bytes().to_vec(), destination_chain: bridge_event.destination_chain_id, + sender_address: bridge_event.sender_address.as_bytes().to_vec(), recipient_address: bridge_event.recipient_address.to_vec(), token_id: bridge_event.token_id, amount: bridge_event.sui_adjusted_amount, @@ -179,13 +183,14 @@ pub async fn process_eth_transaction( write(&pool, transfer); } EthSuiBridgeEvents::TokensClaimedFilter(bridge_event) => { - println!("Observed Eth Claim"); + info!("Observed Eth Claim {:?}", bridge_event); let transfer = TokenTransfer { chain_id: bridge_event.source_chain_id, nonce: bridge_event.nonce, block_height: block_number, timestamp_ms: timestamp, txn_hash: tx_hash.as_bytes().to_vec(), + txn_sender: bridge_event.sender_address.to_vec(), status: TokenTransferStatus::Claimed, gas_usage: gas.as_u64() as i64, data_source: BridgeDataSource::Eth,