Skip to content

Commit

Permalink
indexer: add lag logs & metrics with abs timestamp and cp seq (#17879)
Browse files Browse the repository at this point in the history
## Description 

title, for more granular analysis and tracking

## Test plan 

eyeball and CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
gegaowp authored May 23, 2024
1 parent a10b5a3 commit 8b7216f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 4 deletions.
30 changes: 26 additions & 4 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,25 @@ where
T: R2D2Connection + 'static,
{
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> anyhow::Result<()> {
let cp_download_lag = chrono::Utc::now().timestamp_millis()
- checkpoint.checkpoint_summary.timestamp_ms as i64;
let time_now_ms = chrono::Utc::now().timestamp_millis();
let cp_download_lag = time_now_ms - checkpoint.checkpoint_summary.timestamp_ms as i64;
info!(
"checkpoint download lag for cp {}: {} ms",
checkpoint.checkpoint_summary.sequence_number, cp_download_lag
);
self.metrics.download_lag_ms.set(cp_download_lag);
self.metrics
.max_downloaded_checkpoint_sequence_number
.set(checkpoint.checkpoint_summary.sequence_number as i64);
self.metrics
.downloaded_checkpoint_timestamp_ms
.set(checkpoint.checkpoint_summary.timestamp_ms as i64);
info!(
"Indexer lag: downloaded checkpoint {} with time now {} and checkpoint time {}",
checkpoint.checkpoint_summary.sequence_number,
time_now_ms,
checkpoint.checkpoint_summary.timestamp_ms
);
let checkpoint_data = Self::index_checkpoint(
self.state.clone().into(),
checkpoint.clone(),
Expand Down Expand Up @@ -306,10 +318,20 @@ where
db_displays,
)
};
info!(checkpoint_seq, "Indexed one checkpoint.");
let time_now_ms = chrono::Utc::now().timestamp_millis();
metrics
.index_lag_ms
.set(chrono::Utc::now().timestamp_millis() - checkpoint.timestamp_ms as i64);
.set(time_now_ms - checkpoint.timestamp_ms as i64);
metrics
.max_indexed_checkpoint_sequence_number
.set(checkpoint.sequence_number as i64);
metrics
.indexed_checkpoint_timestamp_ms
.set(checkpoint.timestamp_ms as i64);
info!(
"Indexer lag: indexed checkpoint {} with time now {} and checkpoint time {}",
checkpoint.sequence_number, time_now_ms, checkpoint.timestamp_ms
);

Ok(CheckpointDataToCommit {
checkpoint,
Expand Down
38 changes: 38 additions & 0 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ pub struct IndexerMetrics {
pub latest_tx_checkpoint_sequence_number: IntGauge,
pub latest_indexer_object_checkpoint_sequence_number: IntGauge,
pub latest_object_snapshot_sequence_number: IntGauge,
// max checkpoint sequence numbers on various stages of indexer data ingestion
pub max_downloaded_checkpoint_sequence_number: IntGauge,
pub max_indexed_checkpoint_sequence_number: IntGauge,
pub max_committed_checkpoint_sequence_number: IntGauge,
// the related timestamps of max checkpoint ^ on various stages
pub downloaded_checkpoint_timestamp_ms: IntGauge,
pub indexed_checkpoint_timestamp_ms: IntGauge,
pub committed_checkpoint_timestamp_ms: IntGauge,
// lag starting from the timestamp of the latest checkpoint to the current time
pub download_lag_ms: IntGauge,
pub index_lag_ms: IntGauge,
Expand Down Expand Up @@ -251,6 +259,36 @@ impl IndexerMetrics {
"Latest object snapshot sequence number from the Indexer",
registry,
).unwrap(),
max_downloaded_checkpoint_sequence_number: register_int_gauge_with_registry!(
"max_downloaded_checkpoint_sequence_number",
"Max downloaded checkpoint sequence number",
registry,
).unwrap(),
max_indexed_checkpoint_sequence_number: register_int_gauge_with_registry!(
"max_indexed_checkpoint_sequence_number",
"Max indexed checkpoint sequence number",
registry,
).unwrap(),
max_committed_checkpoint_sequence_number: register_int_gauge_with_registry!(
"max_committed_checkpoint_sequence_number",
"Max committed checkpoint sequence number",
registry,
).unwrap(),
downloaded_checkpoint_timestamp_ms: register_int_gauge_with_registry!(
"downloaded_checkpoint_timestamp_ms",
"Timestamp of the downloaded checkpoint",
registry,
).unwrap(),
indexed_checkpoint_timestamp_ms: register_int_gauge_with_registry!(
"indexed_checkpoint_timestamp_ms",
"Timestamp of the indexed checkpoint",
registry,
).unwrap(),
committed_checkpoint_timestamp_ms: register_int_gauge_with_registry!(
"committed_checkpoint_timestamp_ms",
"Timestamp of the committed checkpoint",
registry,
).unwrap(),
download_lag_ms: register_int_gauge_with_registry!(
"download_lag_ms",
"Lag of the latest checkpoint in milliseconds",
Expand Down
9 changes: 9 additions & 0 deletions crates/sui-indexer/src/store/pg_indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,15 @@ impl<T: R2D2Connection + 'static> PgIndexerStore<T> {
self.metrics
.db_commit_lag_ms
.set(time_now_ms - stored_checkpoint.timestamp_ms);
self.metrics.max_committed_checkpoint_sequence_number.set(
stored_checkpoint.sequence_number,
);
self.metrics.committed_checkpoint_timestamp_ms.set(
stored_checkpoint.timestamp_ms,
);
}
for stored_checkpoint in stored_checkpoint_chunk {
info!("Indexer lag: persisted checkpoint {} with time now {} and checkpoint time {}", stored_checkpoint.sequence_number, time_now_ms, stored_checkpoint.timestamp_ms);
}
}
Ok::<(), IndexerError>(())
Expand Down

0 comments on commit 8b7216f

Please sign in to comment.