Skip to content

Commit

Permalink
[indexer][watermarks][3/n] pruner updates watermarks lower bound (#19650
Browse files Browse the repository at this point in the history
)

## Description 

With the committer writing upper bounds, the pruner can now read from
watermarks and determine whether the lower bounds need to be updated.
Pruner does this by spawning a separate task, without touching the
extant pruning logic (so all things are as is.) It will ignore any
entries from watermarks that do not correspond to a `PrunableTable`
variant.

Part of a stack of PRs for watermarks

simplify setting up test indexer:
#19663
update pruner config: #19637
committer writes upper bounds
#19649
pruner writes lower bounds: #19650
pruner prunes (wip)

## Test plan 

How did you test the new or updated feature?

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
wlmyng authored Oct 17, 2024
1 parent 446d7d5 commit d239be8
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ CREATE TABLE watermarks
timestamp_ms BIGINT NOT NULL,
-- Column used by the pruner to track its true progress. Data at and below this watermark can
-- be immediately pruned.
pruner_lo BIGINT,
pruner_hi_inclusive BIGINT,
PRIMARY KEY (entity)
);
3 changes: 2 additions & 1 deletion crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ pub trait Handler<T>: Send + Sync {

/// The indexer writer operates on checkpoint data, which contains information on the current epoch,
/// checkpoint, and transaction. These three numbers form the watermark upper bound for each
/// committed table.
/// committed table. The reader and pruner are responsible for determining which of the three units
/// will be used for a particular table.
#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub struct CommitterWatermark {
pub epoch: u64,
Expand Down
113 changes: 111 additions & 2 deletions crates/sui-indexer/src/handlers/pruner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use mysten_metrics::spawn_monitored_task;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;
Expand All @@ -23,8 +24,10 @@ pub struct Pruner {
pub metrics: IndexerMetrics,
}

/// Enum representing tables that the pruner is allowed to prune. The pruner will ignore any table
/// that is not listed here.
/// Enum representing tables that the pruner is allowed to prune. This corresponds to table names in
/// the database, and should be used in lieu of string literals. This enum is also meant to
/// facilitate the process of determining which unit (epoch, cp, or tx) should be used for the
/// table's range. Pruner will ignore any table that is not listed here.
#[derive(
Debug,
Eq,
Expand Down Expand Up @@ -69,6 +72,39 @@ pub enum PrunableTable {
PrunerCpWatermark,
}

impl PrunableTable {
pub fn select_reader_lo(&self, cp: u64, tx: u64) -> u64 {
match self {
PrunableTable::ObjectsHistory => cp,
PrunableTable::Transactions => tx,
PrunableTable::Events => tx,

PrunableTable::EventEmitPackage => tx,
PrunableTable::EventEmitModule => tx,
PrunableTable::EventSenders => tx,
PrunableTable::EventStructInstantiation => tx,
PrunableTable::EventStructModule => tx,
PrunableTable::EventStructName => tx,
PrunableTable::EventStructPackage => tx,

PrunableTable::TxAffectedAddresses => tx,
PrunableTable::TxAffectedObjects => tx,
PrunableTable::TxCallsPkg => tx,
PrunableTable::TxCallsMod => tx,
PrunableTable::TxCallsFun => tx,
PrunableTable::TxChangedObjects => tx,
PrunableTable::TxDigests => tx,
PrunableTable::TxInputObjects => tx,
PrunableTable::TxKinds => tx,
PrunableTable::TxRecipients => tx,
PrunableTable::TxSenders => tx,

PrunableTable::Checkpoints => cp,
PrunableTable::PrunerCpWatermark => cp,
}
}
}

impl Pruner {
/// Instantiates a pruner with default retention and overrides. Pruner will finalize the
/// retention policies so there is a value for every prunable table.
Expand Down Expand Up @@ -101,6 +137,15 @@ impl Pruner {
}

pub async fn start(&self, cancel: CancellationToken) -> IndexerResult<()> {
let store_clone = self.store.clone();
let retention_policies = self.retention_policies.clone();
let cancel_clone = cancel.clone();
spawn_monitored_task!(update_watermarks_lower_bounds_task(
store_clone,
retention_policies,
cancel_clone
));

let mut last_seen_max_epoch = 0;
// The first epoch that has not yet been pruned.
let mut next_prune_epoch = None;
Expand Down Expand Up @@ -177,3 +222,67 @@ impl Pruner {
Ok(())
}
}

/// Task to periodically query the `watermarks` table and update the lower bounds for all watermarks
/// if the entry exceeds epoch-level retention policy.
async fn update_watermarks_lower_bounds_task(
store: PgIndexerStore,
retention_policies: HashMap<PrunableTable, u64>,
cancel: CancellationToken,
) -> IndexerResult<()> {
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = cancel.cancelled() => {
info!("Pruner watermark lower bound update task cancelled.");
return Ok(());
}
_ = interval.tick() => {
update_watermarks_lower_bounds(&store, &retention_policies, &cancel).await?;
}
}
}
}

/// Fetches all entries from the `watermarks` table, and updates the `reader_lo` for each entry if
/// its epoch range exceeds the respective retention policy.
async fn update_watermarks_lower_bounds(
store: &PgIndexerStore,
retention_policies: &HashMap<PrunableTable, u64>,
cancel: &CancellationToken,
) -> IndexerResult<()> {
let (watermarks, _) = store.get_watermarks().await?;
let mut lower_bound_updates = vec![];

for watermark in watermarks.iter() {
if cancel.is_cancelled() {
info!("Pruner watermark lower bound update task cancelled.");
return Ok(());
}

let Some(prunable_table) = watermark.entity() else {
continue;
};

let Some(epochs_to_keep) = retention_policies.get(&prunable_table) else {
error!(
"No retention policy found for prunable table {}",
prunable_table
);
continue;
};

if let Some(new_epoch_lo) = watermark.new_epoch_lo(*epochs_to_keep) {
lower_bound_updates.push((prunable_table, new_epoch_lo));
};
}

if !lower_bound_updates.is_empty() {
store
.update_watermarks_lower_bound(lower_bound_updates)
.await?;
info!("Finished updating lower bounds for watermarks");
}

Ok(())
}
33 changes: 29 additions & 4 deletions crates/sui-indexer/src/models/watermarks.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::str::FromStr;

use diesel::prelude::*;

use crate::{
handlers::CommitterWatermark,
handlers::{pruner::PrunableTable, CommitterWatermark},
schema::watermarks::{self},
};
use diesel::prelude::*;

/// Represents a row in the `watermarks` table.
#[derive(Queryable, Insertable, Default, QueryableByName)]
#[derive(Queryable, Insertable, Default, QueryableByName, Clone)]
#[diesel(table_name = watermarks, primary_key(entity))]
pub struct StoredWatermark {
/// The table governed by this watermark, i.e `epochs`, `checkpoints`, `transactions`.
Expand All @@ -35,7 +38,7 @@ pub struct StoredWatermark {
pub timestamp_ms: i64,
/// Column used by the pruner to track its true progress. Data at and below this watermark can
/// be immediately pruned.
pub pruner_lo: Option<i64>,
pub pruner_hi_inclusive: Option<i64>,
}

impl StoredWatermark {
Expand All @@ -48,4 +51,26 @@ impl StoredWatermark {
..StoredWatermark::default()
}
}

pub fn from_lower_bound_update(entity: &str, epoch_lo: u64, reader_lo: u64) -> Self {
StoredWatermark {
entity: entity.to_string(),
epoch_lo: epoch_lo as i64,
reader_lo: reader_lo as i64,
..StoredWatermark::default()
}
}

pub fn entity(&self) -> Option<PrunableTable> {
PrunableTable::from_str(&self.entity).ok()
}

/// Determine whether to set a new epoch lower bound based on the retention policy.
pub fn new_epoch_lo(&self, retention: u64) -> Option<u64> {
if self.epoch_lo as u64 + retention <= self.epoch_hi_inclusive as u64 {
Some((self.epoch_hi_inclusive as u64).saturating_sub(retention - 1))
} else {
None
}
}
}
2 changes: 1 addition & 1 deletion crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ diesel::table! {
tx_hi_inclusive -> Int8,
reader_lo -> Int8,
timestamp_ms -> Int8,
pruner_lo -> Nullable<Int8>,
pruner_hi_inclusive -> Nullable<Int8>,
}
}

Expand Down
12 changes: 12 additions & 0 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ use async_trait::async_trait;
use strum::IntoEnumIterator;

use crate::errors::IndexerError;
use crate::handlers::pruner::PrunableTable;
use crate::handlers::{CommitterWatermark, EpochToCommit, TransactionObjectChangesToCommit};
use crate::models::display::StoredDisplay;
use crate::models::obj_indices::StoredObjectVersion;
use crate::models::objects::{StoredDeletedObject, StoredObject};
use crate::models::raw_checkpoints::StoredRawCheckpoint;
use crate::models::watermarks::StoredWatermark;
use crate::types::{
EventIndex, IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex,
};
Expand Down Expand Up @@ -125,4 +127,14 @@ pub trait IndexerStore: Clone + Sync + Send + 'static {
) -> Result<(), IndexerError>
where
E::Iterator: Iterator<Item: AsRef<str>>;

/// Updates each watermark entry's lower bounds per the list of tables and their new epoch lower
/// bounds.
async fn update_watermarks_lower_bound(
&self,
watermarks: Vec<(PrunableTable, u64)>,
) -> Result<(), IndexerError>;

/// Load all watermark entries from the store, and the latest timestamp from the db.
async fn get_watermarks(&self) -> Result<(Vec<StoredWatermark>, i64), IndexerError>;
}
39 changes: 39 additions & 0 deletions crates/sui-indexer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,42 @@ where
})
.await
}

pub async fn read_with_retry<'a, Q, T>(
pool: &ConnectionPool,
timeout: Duration,
query: Q,
) -> Result<T, IndexerError>
where
Q: for<'r> FnOnce(
&'r mut AsyncPgConnection,
) -> ScopedBoxFuture<'a, 'r, Result<T, IndexerError>>
+ Send,
Q: Clone,
T: 'a,
{
let backoff = backoff::ExponentialBackoff {
max_elapsed_time: Some(timeout),
..Default::default()
};
backoff::future::retry(backoff, || async {
let mut connection = pool.get().await.map_err(|e| backoff::Error::Transient {
err: IndexerError::PostgresWriteError(e.to_string()),
retry_after: None,
})?;

connection
.build_transaction()
.read_only()
.run(query.clone())
.await
.map_err(|e| {
tracing::error!("Error with reading data from DB: {:?}, retrying...", e);
backoff::Error::Transient {
err: IndexerError::PostgresWriteError(e.to_string()),
retry_after: None,
}
})
})
.await
}
Loading

0 comments on commit d239be8

Please sign in to comment.