Skip to content

Commit

Permalink
[indexer][watermarks][1/n] Modify PruningOptions to point to a toml f…
Browse files Browse the repository at this point in the history
…ile of epochs_to_keep and optional per-table overrides (#19637)
  • Loading branch information
wlmyng authored Oct 8, 2024
1 parent 906dce8 commit ff9762e
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 60 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions crates/sui-graphql-rpc/src/test_infra/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use sui_graphql_rpc_client::simple_client::SimpleClient;
use sui_indexer::config::PruningOptions;
pub use sui_indexer::config::RetentionConfig;
pub use sui_indexer::config::SnapshotLagConfig;
use sui_indexer::errors::IndexerError;
use sui_indexer::store::PgIndexerStore;
Expand Down Expand Up @@ -151,7 +151,7 @@ pub async fn start_network_cluster() -> NetworkCluster {
pub async fn serve_executor(
executor: Arc<dyn RestStateReader + Send + Sync>,
snapshot_config: Option<SnapshotLagConfig>,
epochs_to_keep: Option<u64>,
retention_config: Option<RetentionConfig>,
data_ingestion_path: PathBuf,
) -> ExecutorCluster {
let database = TempDb::new().unwrap();
Expand Down Expand Up @@ -184,7 +184,7 @@ pub async fn serve_executor(
let (pg_store, pg_handle, _) = start_indexer_writer_for_testing(
db_url,
Some(snapshot_config.clone()),
Some(PruningOptions { epochs_to_keep }),
retention_config,
Some(data_ingestion_path),
Some(cancellation_token.clone()),
)
Expand Down
3 changes: 3 additions & 0 deletions crates/sui-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
strum.workspace = true
strum_macros.workspace = true
tap.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true, features = ["rt"] }
toml.workspace = true
tracing.workspace = true
url.workspace = true

Expand Down
9 changes: 5 additions & 4 deletions crates/sui-indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ See the [docs](https://docs.sui.io/guides/developer/getting-started/local-networ

Start a local network using the `sui` binary:
```sh
cargo run --bin sui -- start --with-faucet --force-regenesis
cargo run --bin sui -- start --with-faucet --force-regenesis
```

If you want to run a local network with the indexer enabled (note that `libpq` is required), you can run the following command after following the steps in the next section to set up an indexer DB:
Expand Down Expand Up @@ -65,11 +65,12 @@ cargo run --bin sui-indexer -- --db-url "<DATABASE_URL>" --rpc-client-url "https
```
cargo run --bin sui-indexer -- --db-url "<DATABASE_URL>" --rpc-client-url "https://fullnode.devnet.sui.io:443" --rpc-server-worker
```
More flags info can be found in this [file](https://github.com/MystenLabs/sui/blob/main/crates/sui-indexer/src/lib.rs#L83-L123).
More flags info can be found in this [file](src/main.rs#L41).

### DB reset
Run this command under `sui/crates/sui-indexer`, which will wipe DB; In case of schema changes in `.sql` files, this will also update corresponding `schema.rs` file.
When making db-related changes, you may find yourself having to run migrations and reset dbs often. The commands below are how you can invoke these actions.
```sh
diesel database reset --database-url="<DATABASE_URL>"
cargo run --bin sui-indexer -- --database-url "<DATABASE_URL>" reset-database --force
```

## Steps to run locally (TiDB)
Expand Down
226 changes: 222 additions & 4 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::backfill::BackfillTaskKind;
use crate::db::ConnectionPoolConfig;
use crate::{backfill::BackfillTaskKind, handlers::pruner::PrunableTable};
use clap::{Args, Parser, Subcommand};
use std::{net::SocketAddr, path::PathBuf};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, net::SocketAddr, path::PathBuf};
use strum::IntoEnumIterator;
use sui_json_rpc::name_service::NameServiceConfig;
use sui_types::base_types::{ObjectID, SuiAddress};
use url::Url;

/// The primary purpose of objects_history is to serve consistency query.
/// A short retention is sufficient.
const OBJECTS_HISTORY_EPOCHS_TO_KEEP: u64 = 2;

#[derive(Parser, Clone, Debug)]
#[clap(
name = "Sui indexer",
Expand Down Expand Up @@ -208,8 +214,93 @@ pub enum Command {

#[derive(Args, Default, Debug, Clone)]
pub struct PruningOptions {
#[arg(long, env = "EPOCHS_TO_KEEP")]
pub epochs_to_keep: Option<u64>,
/// Path to TOML file containing configuration for retention policies.
#[arg(long)]
pub pruning_config_path: Option<PathBuf>,
}

/// Represents the default retention policy and overrides for prunable tables. Instantiated only if
/// `PruningOptions` is provided on indexer start.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetentionConfig {
/// Default retention policy for all tables.
pub epochs_to_keep: u64,
/// A map of tables to their respective retention policies that will override the default.
/// Prunable tables not named here will use the default retention policy.
#[serde(default)]
pub overrides: HashMap<PrunableTable, u64>,
}

impl PruningOptions {
/// Load default retention policy and overrides from file.
pub fn load_from_file(&self) -> Option<RetentionConfig> {
let config_path = self.pruning_config_path.as_ref()?;

let contents = std::fs::read_to_string(config_path)
.expect("Failed to read default retention policy and overrides from file");
let retention_with_overrides = toml::de::from_str::<RetentionConfig>(&contents)
.expect("Failed to parse into RetentionConfig struct");

let default_retention = retention_with_overrides.epochs_to_keep;

assert!(
default_retention > 0,
"Default retention must be greater than 0"
);
assert!(
retention_with_overrides
.overrides
.values()
.all(|&policy| policy > 0),
"All retention overrides must be greater than 0"
);

Some(retention_with_overrides)
}
}

impl RetentionConfig {
/// Create a new `RetentionConfig` with the specified default retention and overrides. Call
/// `finalize()` on the instance to update the `policies` field with the default retention
/// policy for all tables that do not have an override specified.
pub fn new(epochs_to_keep: u64, overrides: HashMap<PrunableTable, u64>) -> Self {
Self {
epochs_to_keep,
overrides,
}
}

pub fn new_with_default_retention_only_for_testing(epochs_to_keep: u64) -> Self {
let mut overrides = HashMap::new();
overrides.insert(
PrunableTable::ObjectsHistory,
OBJECTS_HISTORY_EPOCHS_TO_KEEP,
);

Self::new(epochs_to_keep, HashMap::new())
}

/// Consumes this struct to produce a full mapping of every prunable table and its retention
/// policy. By default, every prunable table will have the default retention policy from
/// `epochs_to_keep`. Some tables like `objects_history` will observe a different default
/// retention policy. These default values are overridden by any entries in `overrides`.
pub fn retention_policies(self) -> HashMap<PrunableTable, u64> {
let RetentionConfig {
epochs_to_keep,
mut overrides,
} = self;

for table in PrunableTable::iter() {
let default_retention = match table {
PrunableTable::ObjectsHistory => OBJECTS_HISTORY_EPOCHS_TO_KEEP,
_ => epochs_to_keep,
};

overrides.entry(table).or_insert(default_retention);
}

overrides
}
}

#[derive(Args, Debug, Clone)]
Expand Down Expand Up @@ -290,7 +381,9 @@ impl Default for RestoreConfig {
#[cfg(test)]
mod test {
use super::*;
use std::io::Write;
use tap::Pipe;
use tempfile::NamedTempFile;

fn parse_args<'a, T>(args: impl IntoIterator<Item = &'a str>) -> Result<T, clap::error::Error>
where
Expand Down Expand Up @@ -354,4 +447,129 @@ mod test {
// fullnode rpc url must be present
parse_args::<JsonRpcConfig>([]).unwrap_err();
}

#[test]
fn pruning_options_with_objects_history_override() {
let mut temp_file = NamedTempFile::new().unwrap();
let toml_content = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
"#;
temp_file.write_all(toml_content.as_bytes()).unwrap();
let temp_path: PathBuf = temp_file.path().to_path_buf();
let pruning_options = PruningOptions {
pruning_config_path: Some(temp_path.clone()),
};
let retention_config = pruning_options.load_from_file().unwrap();

// Assert the parsed values
assert_eq!(retention_config.epochs_to_keep, 5);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::ObjectsHistory)
.copied(),
Some(10)
);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::Transactions)
.copied(),
Some(20)
);
assert_eq!(retention_config.overrides.len(), 2);

let retention_policies = retention_config.retention_policies();

for table in PrunableTable::iter() {
let Some(retention) = retention_policies.get(&table).copied() else {
panic!("Expected a retention policy for table {:?}", table);
};

match table {
PrunableTable::ObjectsHistory => assert_eq!(retention, 10),
PrunableTable::Transactions => assert_eq!(retention, 20),
_ => assert_eq!(retention, 5),
};
}
}

#[test]
fn pruning_options_no_objects_history_override() {
let mut temp_file = NamedTempFile::new().unwrap();
let toml_content = r#"
epochs_to_keep = 5
[overrides]
tx_affected_addresses = 10
transactions = 20
"#;
temp_file.write_all(toml_content.as_bytes()).unwrap();
let temp_path: PathBuf = temp_file.path().to_path_buf();
let pruning_options = PruningOptions {
pruning_config_path: Some(temp_path.clone()),
};
let retention_config = pruning_options.load_from_file().unwrap();

// Assert the parsed values
assert_eq!(retention_config.epochs_to_keep, 5);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::TxAffectedAddresses)
.copied(),
Some(10)
);
assert_eq!(
retention_config
.overrides
.get(&PrunableTable::Transactions)
.copied(),
Some(20)
);
assert_eq!(retention_config.overrides.len(), 2);

let retention_policies = retention_config.retention_policies();

for table in PrunableTable::iter() {
let Some(retention) = retention_policies.get(&table).copied() else {
panic!("Expected a retention policy for table {:?}", table);
};

match table {
PrunableTable::ObjectsHistory => {
assert_eq!(retention, OBJECTS_HISTORY_EPOCHS_TO_KEEP)
}
PrunableTable::TxAffectedAddresses => assert_eq!(retention, 10),
PrunableTable::Transactions => assert_eq!(retention, 20),
_ => assert_eq!(retention, 5),
};
}
}

#[test]
fn test_invalid_pruning_config_file() {
let toml_str = r#"
epochs_to_keep = 5
[overrides]
objects_history = 10
transactions = 20
invalid_table = 30
"#;

let result = toml::from_str::<RetentionConfig>(toml_str);
assert!(result.is_err(), "Expected an error, but parsing succeeded");

if let Err(e) = result {
assert!(
e.to_string().contains("unknown variant `invalid_table`"),
"Error message doesn't mention the invalid table"
);
}
}
}
Loading

0 comments on commit ff9762e

Please sign in to comment.