diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index 787bd3853f74f..b86dd5883a93e 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -13,6 +13,7 @@ pub mod metrics; pub mod models; pub mod postgres_manager; pub mod schema; +pub mod storage; pub mod sui_transaction_handler; pub mod sui_transaction_queries; pub mod types; diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index d0a2e94f4887f..3ad53b1d02d31 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -24,7 +24,8 @@ use sui_bridge_indexer::config::IndexerConfig; use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper; use sui_bridge_indexer::metrics::BridgeIndexerMetrics; use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store}; -use sui_bridge_indexer::sui_bridge_indexer::{PgBridgePersistent, SuiBridgeDataMapper}; +use sui_bridge_indexer::storage::PgBridgePersistent; +use sui_bridge_indexer::sui_bridge_indexer::SuiBridgeDataMapper; use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource; use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop; use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task; diff --git a/crates/sui-bridge-indexer/src/storage.rs b/crates/sui-bridge-indexer/src/storage.rs new file mode 100644 index 0000000000000..9e43d022e5120 --- /dev/null +++ b/crates/sui-bridge-indexer/src/storage.rs @@ -0,0 +1,180 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::{anyhow, Error}; +use async_trait::async_trait; +use diesel::dsl::now; +use diesel::{ExpressionMethods, TextExpressionMethods}; +use diesel::{OptionalExtension, QueryDsl, SelectableHelper}; +use diesel_async::scoped_futures::ScopedFutureExt; +use diesel_async::AsyncConnection; +use diesel_async::RunQueryDsl; + +use crate::postgres_manager::PgPool; +use crate::schema::progress_store::{columns, dsl}; +use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data}; +use crate::{models, schema, ProcessedTxnData}; +use sui_indexer_builder::indexer_builder::{IndexerProgressStore, Persistent}; +use sui_indexer_builder::Task; + +/// Persistent layer impl +#[derive(Clone)] +pub struct PgBridgePersistent { + pool: PgPool, +} + +impl PgBridgePersistent { + pub fn new(pool: PgPool) -> Self { + Self { pool } + } +} + +// TODO: this is shared between SUI and ETH, move to different file. +#[async_trait] +impl Persistent for PgBridgePersistent { + async fn write(&self, data: Vec) -> Result<(), Error> { + if data.is_empty() { + return Ok(()); + } + let connection = &mut self.pool.get().await?; + connection + .transaction(|conn| { + async move { + for d in data { + match d { + ProcessedTxnData::TokenTransfer(t) => { + diesel::insert_into(token_transfer::table) + .values(&t.to_db()) + .on_conflict_do_nothing() + .execute(conn) + .await?; + + if let Some(d) = t.to_data_maybe() { + diesel::insert_into(token_transfer_data::table) + .values(&d) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } + } + ProcessedTxnData::Error(e) => { + diesel::insert_into(sui_error_transactions::table) + .values(&e.to_db()) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } + } + } + Ok(()) + } + .scope_boxed() + }) + .await + } +} + +#[async_trait] +impl IndexerProgressStore for PgBridgePersistent { + async fn load_progress(&self, task_name: String) -> anyhow::Result { + let mut conn = self.pool.get().await?; + let cp: Option = dsl::progress_store + .find(&task_name) + .select(models::ProgressStore::as_select()) + .first(&mut conn) + .await + .optional()?; + Ok(cp + .ok_or(anyhow!("Cannot found progress for task {task_name}"))? + .checkpoint as u64) + } + + async fn save_progress( + &mut self, + task_name: String, + checkpoint_number: u64, + ) -> anyhow::Result<()> { + let mut conn = self.pool.get().await?; + diesel::insert_into(schema::progress_store::table) + .values(&models::ProgressStore { + task_name, + checkpoint: checkpoint_number as i64, + // Target checkpoint and timestamp will only be written for new entries + target_checkpoint: i64::MAX, + // Timestamp is defaulted to current time in DB if None + timestamp: None, + }) + .on_conflict(dsl::task_name) + .do_update() + .set(( + columns::checkpoint.eq(checkpoint_number as i64), + columns::timestamp.eq(now), + )) + .execute(&mut conn) + .await?; + Ok(()) + } + + async fn get_ongoing_tasks(&self, prefix: &str) -> Result, anyhow::Error> { + let mut conn = self.pool.get().await?; + // get all unfinished tasks + let cp: Vec = dsl::progress_store + // TODO: using like could be error prone, change the progress store schema to stare the task name properly. + .filter(columns::task_name.like(format!("{prefix} - %"))) + .filter(columns::checkpoint.lt(columns::target_checkpoint)) + .order_by(columns::target_checkpoint.desc()) + .load(&mut conn) + .await?; + Ok(cp.into_iter().map(|d| d.into()).collect()) + } + + async fn get_largest_backfill_task_target_checkpoint( + &self, + prefix: &str, + ) -> Result, Error> { + let mut conn = self.pool.get().await?; + let cp: Option = dsl::progress_store + .select(columns::target_checkpoint) + // TODO: using like could be error prone, change the progress store schema to stare the task name properly. + .filter(columns::task_name.like(format!("{prefix} - %"))) + .filter(columns::target_checkpoint.ne(i64::MAX)) + .order_by(columns::target_checkpoint.desc()) + .first::(&mut conn) + .await + .optional()?; + Ok(cp.map(|c| c as u64)) + } + + async fn register_task( + &mut self, + task_name: String, + checkpoint: u64, + target_checkpoint: u64, + ) -> Result<(), anyhow::Error> { + let mut conn = self.pool.get().await?; + diesel::insert_into(schema::progress_store::table) + .values(models::ProgressStore { + task_name, + checkpoint: checkpoint as i64, + target_checkpoint: target_checkpoint as i64, + // Timestamp is defaulted to current time in DB if None + timestamp: None, + }) + .execute(&mut conn) + .await?; + Ok(()) + } + + async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> { + let mut conn = self.pool.get().await?; + diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name))) + .set(( + columns::checkpoint.eq(task.checkpoint as i64), + columns::target_checkpoint.eq(task.target_checkpoint as i64), + columns::timestamp.eq(now), + )) + .execute(&mut conn) + .await?; + Ok(()) + } +} diff --git a/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs b/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs index 7caf595e1b9f6..f455665b6392a 100644 --- a/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs +++ b/crates/sui-bridge-indexer/src/sui_bridge_indexer.rs @@ -1,21 +1,13 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use anyhow::{anyhow, Error}; -use async_trait::async_trait; -use diesel::dsl::now; -use diesel::{ExpressionMethods, TextExpressionMethods}; -use diesel::{OptionalExtension, QueryDsl, SelectableHelper}; -use diesel_async::scoped_futures::ScopedFutureExt; -use diesel_async::AsyncConnection; -use diesel_async::RunQueryDsl; +use anyhow::Error; use tracing::info; use sui_bridge::events::{ MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, }; -use sui_indexer_builder::indexer_builder::{DataMapper, IndexerProgressStore, Persistent}; -use sui_indexer_builder::Task; +use sui_indexer_builder::indexer_builder::DataMapper; use sui_types::effects::TransactionEffectsAPI; use sui_types::event::Event; use sui_types::execution_status::ExecutionStatus; @@ -23,177 +15,12 @@ use sui_types::full_checkpoint_content::CheckpointTransaction; use sui_types::{BRIDGE_ADDRESS, SUI_BRIDGE_OBJECT_ID}; use crate::metrics::BridgeIndexerMetrics; -use crate::postgres_manager::PgPool; -use crate::schema::progress_store::{columns, dsl}; -use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data}; use crate::sui_datasource::CheckpointTxnData; use crate::{ - models, schema, BridgeDataSource, ProcessedTxnData, SuiTxnError, TokenTransfer, - TokenTransferData, TokenTransferStatus, + BridgeDataSource, ProcessedTxnData, SuiTxnError, TokenTransfer, TokenTransferData, + TokenTransferStatus, }; -/// Persistent layer impl -#[derive(Clone)] -pub struct PgBridgePersistent { - pool: PgPool, -} - -impl PgBridgePersistent { - pub fn new(pool: PgPool) -> Self { - Self { pool } - } -} - -// TODO: this is shared between SUI and ETH, move to different file. -#[async_trait] -impl Persistent for PgBridgePersistent { - async fn write(&self, data: Vec) -> Result<(), Error> { - if data.is_empty() { - return Ok(()); - } - let connection = &mut self.pool.get().await?; - connection - .transaction(|conn| { - async move { - for d in data { - match d { - ProcessedTxnData::TokenTransfer(t) => { - diesel::insert_into(token_transfer::table) - .values(&t.to_db()) - .on_conflict_do_nothing() - .execute(conn) - .await?; - - if let Some(d) = t.to_data_maybe() { - diesel::insert_into(token_transfer_data::table) - .values(&d) - .on_conflict_do_nothing() - .execute(conn) - .await?; - } - } - ProcessedTxnData::Error(e) => { - diesel::insert_into(sui_error_transactions::table) - .values(&e.to_db()) - .on_conflict_do_nothing() - .execute(conn) - .await?; - } - } - } - Ok(()) - } - .scope_boxed() - }) - .await - } -} - -#[async_trait] -impl IndexerProgressStore for PgBridgePersistent { - async fn load_progress(&self, task_name: String) -> anyhow::Result { - let mut conn = self.pool.get().await?; - let cp: Option = dsl::progress_store - .find(&task_name) - .select(models::ProgressStore::as_select()) - .first(&mut conn) - .await - .optional()?; - Ok(cp - .ok_or(anyhow!("Cannot found progress for task {task_name}"))? - .checkpoint as u64) - } - - async fn save_progress( - &mut self, - task_name: String, - checkpoint_number: u64, - ) -> anyhow::Result<()> { - let mut conn = self.pool.get().await?; - diesel::insert_into(schema::progress_store::table) - .values(&models::ProgressStore { - task_name, - checkpoint: checkpoint_number as i64, - // Target checkpoint and timestamp will only be written for new entries - target_checkpoint: i64::MAX, - // Timestamp is defaulted to current time in DB if None - timestamp: None, - }) - .on_conflict(dsl::task_name) - .do_update() - .set(( - columns::checkpoint.eq(checkpoint_number as i64), - columns::timestamp.eq(now), - )) - .execute(&mut conn) - .await?; - Ok(()) - } - - async fn get_ongoing_tasks(&self, prefix: &str) -> Result, anyhow::Error> { - let mut conn = self.pool.get().await?; - // get all unfinished tasks - let cp: Vec = dsl::progress_store - // TODO: using like could be error prone, change the progress store schema to stare the task name properly. - .filter(columns::task_name.like(format!("{prefix} - %"))) - .filter(columns::checkpoint.lt(columns::target_checkpoint)) - .order_by(columns::target_checkpoint.desc()) - .load(&mut conn) - .await?; - Ok(cp.into_iter().map(|d| d.into()).collect()) - } - - async fn get_largest_backfill_task_target_checkpoint( - &self, - prefix: &str, - ) -> Result, Error> { - let mut conn = self.pool.get().await?; - let cp: Option = dsl::progress_store - .select(columns::target_checkpoint) - // TODO: using like could be error prone, change the progress store schema to stare the task name properly. - .filter(columns::task_name.like(format!("{prefix} - %"))) - .filter(columns::target_checkpoint.ne(i64::MAX)) - .order_by(columns::target_checkpoint.desc()) - .first::(&mut conn) - .await - .optional()?; - Ok(cp.map(|c| c as u64)) - } - - async fn register_task( - &mut self, - task_name: String, - checkpoint: u64, - target_checkpoint: u64, - ) -> Result<(), anyhow::Error> { - let mut conn = self.pool.get().await?; - diesel::insert_into(schema::progress_store::table) - .values(models::ProgressStore { - task_name, - checkpoint: checkpoint as i64, - target_checkpoint: target_checkpoint as i64, - // Timestamp is defaulted to current time in DB if None - timestamp: None, - }) - .execute(&mut conn) - .await?; - Ok(()) - } - - async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> { - let mut conn = self.pool.get().await?; - diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name))) - .set(( - columns::checkpoint.eq(task.checkpoint as i64), - columns::target_checkpoint.eq(task.target_checkpoint as i64), - columns::timestamp.eq(now), - )) - .execute(&mut conn) - .await?; - Ok(()) - } -} - /// Data mapper impl #[derive(Clone)] pub struct SuiBridgeDataMapper {