Skip to content

Commit

Permalink
[bridge-indexer] split out storage component in bridge indexer into i…
Browse files Browse the repository at this point in the history
…ts own file (#19238)

## Description 

This is just moving things around. It makes the next PR smaller.

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
longbowlu authored and suiwombat committed Sep 16, 2024
1 parent 238db2f commit 6c8e384
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 178 deletions.
1 change: 1 addition & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod metrics;
pub mod models;
pub mod postgres_manager;
pub mod schema;
pub mod storage;
pub mod sui_transaction_handler;
pub mod sui_transaction_queries;
pub mod types;
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ use sui_bridge_indexer::config::IndexerConfig;
use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper;
use sui_bridge_indexer::metrics::BridgeIndexerMetrics;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store};
use sui_bridge_indexer::sui_bridge_indexer::{PgBridgePersistent, SuiBridgeDataMapper};
use sui_bridge_indexer::storage::PgBridgePersistent;
use sui_bridge_indexer::sui_bridge_indexer::SuiBridgeDataMapper;
use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
use sui_bridge_indexer::sui_transaction_queries::start_sui_tx_polling_task;
Expand Down
180 changes: 180 additions & 0 deletions crates/sui-bridge-indexer/src/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::{anyhow, Error};
use async_trait::async_trait;
use diesel::dsl::now;
use diesel::{ExpressionMethods, TextExpressionMethods};
use diesel::{OptionalExtension, QueryDsl, SelectableHelper};
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::AsyncConnection;
use diesel_async::RunQueryDsl;

use crate::postgres_manager::PgPool;
use crate::schema::progress_store::{columns, dsl};
use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
use crate::{models, schema, ProcessedTxnData};
use sui_indexer_builder::indexer_builder::{IndexerProgressStore, Persistent};
use sui_indexer_builder::Task;

/// Persistent layer impl
#[derive(Clone)]
pub struct PgBridgePersistent {
pool: PgPool,
}

impl PgBridgePersistent {
pub fn new(pool: PgPool) -> Self {
Self { pool }
}
}

// TODO: this is shared between SUI and ETH, move to different file.
#[async_trait]
impl Persistent<ProcessedTxnData> for PgBridgePersistent {
async fn write(&self, data: Vec<ProcessedTxnData>) -> Result<(), Error> {
if data.is_empty() {
return Ok(());
}
let connection = &mut self.pool.get().await?;
connection
.transaction(|conn| {
async move {
for d in data {
match d {
ProcessedTxnData::TokenTransfer(t) => {
diesel::insert_into(token_transfer::table)
.values(&t.to_db())
.on_conflict_do_nothing()
.execute(conn)
.await?;

if let Some(d) = t.to_data_maybe() {
diesel::insert_into(token_transfer_data::table)
.values(&d)
.on_conflict_do_nothing()
.execute(conn)
.await?;
}
}
ProcessedTxnData::Error(e) => {
diesel::insert_into(sui_error_transactions::table)
.values(&e.to_db())
.on_conflict_do_nothing()
.execute(conn)
.await?;
}
}
}
Ok(())
}
.scope_boxed()
})
.await
}
}

#[async_trait]
impl IndexerProgressStore for PgBridgePersistent {
async fn load_progress(&self, task_name: String) -> anyhow::Result<u64> {
let mut conn = self.pool.get().await?;
let cp: Option<models::ProgressStore> = dsl::progress_store
.find(&task_name)
.select(models::ProgressStore::as_select())
.first(&mut conn)
.await
.optional()?;
Ok(cp
.ok_or(anyhow!("Cannot found progress for task {task_name}"))?
.checkpoint as u64)
}

async fn save_progress(
&mut self,
task_name: String,
checkpoint_number: u64,
) -> anyhow::Result<()> {
let mut conn = self.pool.get().await?;
diesel::insert_into(schema::progress_store::table)
.values(&models::ProgressStore {
task_name,
checkpoint: checkpoint_number as i64,
// Target checkpoint and timestamp will only be written for new entries
target_checkpoint: i64::MAX,
// Timestamp is defaulted to current time in DB if None
timestamp: None,
})
.on_conflict(dsl::task_name)
.do_update()
.set((
columns::checkpoint.eq(checkpoint_number as i64),
columns::timestamp.eq(now),
))
.execute(&mut conn)
.await?;
Ok(())
}

async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Vec<Task>, anyhow::Error> {
let mut conn = self.pool.get().await?;
// get all unfinished tasks
let cp: Vec<models::ProgressStore> = dsl::progress_store
// TODO: using like could be error prone, change the progress store schema to stare the task name properly.
.filter(columns::task_name.like(format!("{prefix} - %")))
.filter(columns::checkpoint.lt(columns::target_checkpoint))
.order_by(columns::target_checkpoint.desc())
.load(&mut conn)
.await?;
Ok(cp.into_iter().map(|d| d.into()).collect())
}

async fn get_largest_backfill_task_target_checkpoint(
&self,
prefix: &str,
) -> Result<Option<u64>, Error> {
let mut conn = self.pool.get().await?;
let cp: Option<i64> = dsl::progress_store
.select(columns::target_checkpoint)
// TODO: using like could be error prone, change the progress store schema to stare the task name properly.
.filter(columns::task_name.like(format!("{prefix} - %")))
.filter(columns::target_checkpoint.ne(i64::MAX))
.order_by(columns::target_checkpoint.desc())
.first::<i64>(&mut conn)
.await
.optional()?;
Ok(cp.map(|c| c as u64))
}

async fn register_task(
&mut self,
task_name: String,
checkpoint: u64,
target_checkpoint: u64,
) -> Result<(), anyhow::Error> {
let mut conn = self.pool.get().await?;
diesel::insert_into(schema::progress_store::table)
.values(models::ProgressStore {
task_name,
checkpoint: checkpoint as i64,
target_checkpoint: target_checkpoint as i64,
// Timestamp is defaulted to current time in DB if None
timestamp: None,
})
.execute(&mut conn)
.await?;
Ok(())
}

async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
let mut conn = self.pool.get().await?;
diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
.set((
columns::checkpoint.eq(task.checkpoint as i64),
columns::target_checkpoint.eq(task.target_checkpoint as i64),
columns::timestamp.eq(now),
))
.execute(&mut conn)
.await?;
Ok(())
}
}
Loading

0 comments on commit 6c8e384

Please sign in to comment.