Skip to content

Commit

Permalink
progress saver
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Sep 6, 2024
1 parent de51cba commit c6782f3
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 94 deletions.
8 changes: 0 additions & 8 deletions crates/sui-bridge-indexer/src/eth_bridge_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,6 @@ impl Datasource<RawEthData> for EthSubscriptionDatasource {
fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec {
&self.indexer_metrics.tasks_processed_checkpoints
}

fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.live_task_current_checkpoint
}
}

pub struct EthSyncDatasource {
Expand Down Expand Up @@ -288,10 +284,6 @@ impl Datasource<RawEthData> for EthSyncDatasource {
fn get_tasks_processed_checkpoints_metric(&self) -> &IntCounterVec {
&self.indexer_metrics.tasks_processed_checkpoints
}

fn get_live_task_checkpoint_metric(&self) -> &IntGaugeVec {
&self.indexer_metrics.live_task_current_checkpoint
}
}

#[derive(Clone)]
Expand Down
24 changes: 20 additions & 4 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use sui_bridge_indexer::config::IndexerConfig;
use sui_bridge_indexer::eth_bridge_indexer::EthDataMapper;
use sui_bridge_indexer::metrics::BridgeIndexerMetrics;
use sui_bridge_indexer::postgres_manager::{get_connection_pool, read_sui_progress_store};
use sui_bridge_indexer::storage::PgBridgePersistent;
use sui_bridge_indexer::storage::{
OutOfOrderSaveAfterDurationPolicy, PgBridgePersistent, ProgressSavingPolicy,
SaveAfterDurationPolicy,
};
use sui_bridge_indexer::sui_bridge_indexer::SuiBridgeDataMapper;
use sui_bridge_indexer::sui_datasource::SuiCheckpointDatasource;
use sui_bridge_indexer::sui_transaction_handler::handle_sui_transactions_loop;
Expand Down Expand Up @@ -72,7 +75,20 @@ async fn main() -> Result<()> {
let bridge_metrics = Arc::new(BridgeMetrics::new(&registry));

let db_url = config.db_url.clone();
let datastore = PgBridgePersistent::new(get_connection_pool(db_url.clone()).await);
let datastore = PgBridgePersistent::new(
get_connection_pool(db_url.clone()).await,
ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
indexer_meterics.clone(),
);
let datastore_with_out_of_order_source = PgBridgePersistent::new(
get_connection_pool(db_url.clone()).await,
ProgressSavingPolicy::OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy::new(
tokio::time::Duration::from_secs(30),
)),
indexer_meterics.clone(),
);

let eth_client: Arc<EthClient<MeteredEthHttpProvier>> = Arc::new(
EthClient::<MeteredEthHttpProvier>::new(
Expand Down Expand Up @@ -119,7 +135,7 @@ async fn main() -> Result<()> {
EthDataMapper {
metrics: indexer_meterics.clone(),
},
datastore.clone(),
datastore,
)
.with_backfill_strategy(BackfillStrategy::Partitioned { task_size: 1000 })
.disable_live_task()
Expand All @@ -146,7 +162,7 @@ async fn main() -> Result<()> {
SuiBridgeDataMapper {
metrics: indexer_meterics.clone(),
},
datastore,
datastore_with_out_of_order_source,
)
.build();
indexer.start().await?;
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-bridge-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct BridgeIndexerMetrics {
pub(crate) last_synced_eth_block: IntGauge,
pub(crate) tasks_remaining_checkpoints: IntGaugeVec,
pub(crate) tasks_processed_checkpoints: IntCounterVec,
pub(crate) live_task_current_checkpoint: IntGaugeVec,
pub(crate) tasks_current_checkpoints: IntGaugeVec,
}

impl BridgeIndexerMetrics {
Expand Down Expand Up @@ -115,9 +115,9 @@ impl BridgeIndexerMetrics {
registry,
)
.unwrap(),
live_task_current_checkpoint: register_int_gauge_vec_with_registry!(
"bridge_indexer_live_task_current_checkpoint",
"Current checkpoint of live task",
tasks_current_checkpoints: register_int_gauge_vec_with_registry!(
"bridge_indexer_tasks_current_checkpoints",
"Current checkpoint for each task",
&["task_name"],
registry,
)
Expand Down
Loading

0 comments on commit c6782f3

Please sign in to comment.