diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 5f8f9fe8..c6c03737 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -199,7 +199,7 @@ jobs: name: jito_tip_router_program.so path: integration_tests/tests/fixtures/ - uses: taiki-e/install-action@nextest - - run: cargo nextest run --all-features -E 'not test(ledger_utils::tests::test_get_bank_from_ledger_success) and not test(test_meta_merkle_creation_from_ledger)' + - run: cargo nextest run --all-features -E 'not test(ledger_utils::tests)' env: SBF_OUT_DIR: ${{ github.workspace }}/integration_tests/tests/fixtures diff --git a/Cargo.lock b/Cargo.lock index a817b4ed..3cc0a454 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10147,7 +10147,6 @@ dependencies = [ "im", "itertools 0.11.0", "jito-bytemuck", - "jito-restaking-program", "jito-tip-distribution-sdk", "jito-tip-payment-sdk", "jito-tip-router-client", diff --git a/format.sh b/format.sh index 1c71d0eb..298a9d1f 100755 --- a/format.sh +++ b/format.sh @@ -14,7 +14,7 @@ cargo fmt --all print_executing "cargo nextest run --all-features" cargo build-sbf --sbf-out-dir integration_tests/tests/fixtures -SBF_OUT_DIR=integration_tests/tests/fixtures cargo nextest run --all-features -E 'not test(ledger_utils::tests::test_get_bank_from_ledger_success) and not test(test_meta_merkle_creation_from_ledger)' +SBF_OUT_DIR=integration_tests/tests/fixtures cargo nextest run --all-features -E 'not test(ledger_utils::tests::test_get_bank_from_ledger_success)' # Code coverage only runs with flag if [[ "$*" == *"--code-coverage"* ]]; then diff --git a/meta_merkle_tree/src/generated_merkle_tree.rs b/meta_merkle_tree/src/generated_merkle_tree.rs index ab0d58be..a671f3a1 100644 --- a/meta_merkle_tree/src/generated_merkle_tree.rs +++ b/meta_merkle_tree/src/generated_merkle_tree.rs @@ -1,4 +1,8 @@ -use std::{fs::File, io::BufReader, path::PathBuf}; +use std::{ + fs::File, + io::{BufReader, Write}, + path::PathBuf, +}; use jito_tip_distribution_sdk::{ jito_tip_distribution::ID as TIP_DISTRIBUTION_ID, CLAIM_STATUS_SEED, @@ -119,6 +123,14 @@ impl GeneratedMerkleTreeCollection { Ok(tree) } + + /// Write a GeneratedMerkleTreeCollection to a filepath + pub fn write_to_file(&self, path: &PathBuf) -> Result<(), MerkleRootGeneratorError> { + let serialized = serde_json::to_string_pretty(&self)?; + let mut file = File::create(path)?; + file.write_all(serialized.as_bytes())?; + Ok(()) + } } #[derive(Clone, Eq, Debug, Hash, PartialEq, Deserialize, Serialize)] @@ -329,6 +341,24 @@ pub struct StakeMetaCollection { pub slot: Slot, } +impl StakeMetaCollection { + /// Load a serialized merkle tree from file path + pub fn new_from_file(path: &PathBuf) -> Result { + let file = File::open(path)?; + let reader = BufReader::new(file); + let tree: Self = serde_json::from_reader(reader)?; + + Ok(tree) + } + + /// Write a merkle tree to a filepath + pub fn write_to_file(&self, path: &PathBuf) { + let serialized = serde_json::to_string_pretty(&self).unwrap(); + let mut file = File::create(path).unwrap(); + file.write_all(serialized.as_bytes()).unwrap(); + } +} + #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] pub struct StakeMeta { #[serde(with = "pubkey_string_conversion")] diff --git a/meta_merkle_tree/src/meta_merkle_tree.rs b/meta_merkle_tree/src/meta_merkle_tree.rs index a6be8fff..5a8ff937 100644 --- a/meta_merkle_tree/src/meta_merkle_tree.rs +++ b/meta_merkle_tree/src/meta_merkle_tree.rs @@ -90,10 +90,11 @@ impl MetaMerkleTree { } /// Write a merkle tree to a filepath - pub fn write_to_file(&self, path: &PathBuf) { - let serialized = serde_json::to_string_pretty(&self).unwrap(); - let mut file = File::create(path).unwrap(); - file.write_all(serialized.as_bytes()).unwrap(); + pub fn write_to_file(&self, path: &PathBuf) -> Result<()> { + let serialized = serde_json::to_string_pretty(&self)?; + let mut file = File::create(path)?; + file.write_all(serialized.as_bytes())?; + Ok(()) } pub fn get_node(&self, tip_distribution_account: &Pubkey) -> TreeNode { @@ -241,7 +242,7 @@ mod tests { let path = PathBuf::from("merkle_tree.json"); // serialize merkle distributor to file - merkle_distributor_info.write_to_file(&path); + merkle_distributor_info.write_to_file(&path).unwrap(); // now test we can successfully read from file let merkle_distributor_read: MetaMerkleTree = MetaMerkleTree::new_from_file(&path).unwrap(); diff --git a/tip-router-operator-cli/Cargo.toml b/tip-router-operator-cli/Cargo.toml index 4f03b39b..fd79641f 100644 --- a/tip-router-operator-cli/Cargo.toml +++ b/tip-router-operator-cli/Cargo.toml @@ -17,7 +17,6 @@ hex = "0.4" im = "15.1" itertools = "0.11" jito-bytemuck = { workspace = true } -jito-restaking-program = { workspace = true } jito-tip-distribution-sdk = { workspace = true } jito-tip-payment-sdk = { workspace = true } jito-tip-router-client = { workspace = true } diff --git a/tip-router-operator-cli/src/backup_snapshots.rs b/tip-router-operator-cli/src/backup_snapshots.rs index bb37efb7..3899c9e5 100644 --- a/tip-router-operator-cli/src/backup_snapshots.rs +++ b/tip-router-operator-cli/src/backup_snapshots.rs @@ -7,6 +7,7 @@ use std::time::Duration; use tokio::time; use crate::process_epoch::get_previous_epoch_last_slot; +use crate::{merkle_tree_collection_file_name, meta_merkle_tree_file_name, stake_meta_file_name}; const MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH: usize = 3; @@ -14,7 +15,7 @@ const MAXIMUM_BACKUP_INCREMENTAL_SNAPSHOTS_PER_EPOCH: usize = 3; #[derive(Debug)] pub struct SnapshotInfo { path: PathBuf, - _start_slot: u64, + _start_slot: Option, pub end_slot: u64, } @@ -23,27 +24,69 @@ impl SnapshotInfo { pub fn from_path(path: PathBuf) -> Option { let file_name = path.file_name()?.to_str()?; - // Only try to parse if it's an incremental snapshot - if !file_name.starts_with("incremental-snapshot-") { - return None; - } - // Split on hyphens and take the slot numbers - // Format: incremental-snapshot---.tar.zst let parts: Vec<&str> = file_name.split('-').collect(); - if parts.len() < 5 { - return None; + if parts.len() == 5 { + // incremental snapshot + // Format: incremental-snapshot---.tar.zst + // Parse start and end slots + let start_slot: u64 = parts[2].parse().ok()?; + let end_slot = parts[3].parse().ok()?; + + Some(Self { + path, + _start_slot: Some(start_slot), + end_slot, + }) + } else if parts.len() == 3 { + // Full snapshot + // Format: snapshot--.tar.zst + let end_slot = parts[1].parse().ok()?; + + Some(Self { + path, + _start_slot: None, + end_slot, + }) + } else { + None } + } + + pub const fn is_incremental(&self) -> bool { + self._start_slot.is_some() + } +} + +/// Represents a parsed incremental snapshot filename +#[derive(Debug)] +pub struct SavedTipRouterFile { + path: PathBuf, + epoch: u64, +} - // Parse start and end slots - let start_slot = parts[2].parse().ok()?; - let end_slot = parts[3].parse().ok()?; +impl SavedTipRouterFile { + /// Try to parse a TipRouter saved filename with epoch information + pub fn from_path(path: PathBuf) -> Option { + let file_name = path.file_name()?.to_str()?; - Some(Self { - path, - _start_slot: start_slot, - end_slot, - }) + // Split on underscore to get epoch + let parts: Vec<&str> = file_name.split('_').collect(); + let epoch: u64 = parts[0].parse().ok()?; + + let is_tip_router_file = [ + stake_meta_file_name(epoch), + merkle_tree_collection_file_name(epoch), + meta_merkle_tree_file_name(epoch), + ] + .iter() + .any(|x| *x == file_name); + + if is_tip_router_file { + Some(Self { path, epoch }) + } else { + None + } } } @@ -52,6 +95,8 @@ pub struct BackupSnapshotMonitor { snapshots_dir: PathBuf, backup_dir: PathBuf, override_target_slot: Option, + save_path: PathBuf, + num_monitored_epochs: u64, } impl BackupSnapshotMonitor { @@ -60,12 +105,16 @@ impl BackupSnapshotMonitor { snapshots_dir: PathBuf, backup_dir: PathBuf, override_target_slot: Option, + save_path: PathBuf, + num_monitored_epochs: u64, ) -> Self { Self { rpc_client: RpcClient::new(rpc_url.to_string()), snapshots_dir, backup_dir, override_target_slot, + save_path, + num_monitored_epochs, } } @@ -94,7 +143,7 @@ impl BackupSnapshotMonitor { let before_target_slot = snap.end_slot <= target_slot; let in_same_epoch = (snap.end_slot / DEFAULT_SLOTS_PER_EPOCH) == (target_slot / DEFAULT_SLOTS_PER_EPOCH); - before_target_slot && in_same_epoch + snap.is_incremental() && before_target_slot && in_same_epoch }) .max_by_key(|snap| snap.end_slot) .map(|snap| snap.path) @@ -174,6 +223,25 @@ impl BackupSnapshotMonitor { Ok(()) } + /// Deletes TipRouter saved files that were created <= epoch + fn evict_saved_files(&self, epoch: u64) -> Result<()> { + let dir_entries = std::fs::read_dir(&self.save_path)?; + // Filter the files and evict files that are <= epoch + dir_entries + .filter_map(Result::ok) + .filter_map(|entry| SavedTipRouterFile::from_path(entry.path())) + .filter(|saved_file| saved_file.epoch <= epoch) + .try_for_each(|saved_file| { + log::debug!( + "Removing old asved file from epoch {}: {:?}", + saved_file.epoch, + saved_file.path + ); + std::fs::remove_file(saved_file.path.as_path()) + })?; + Ok(()) + } + fn evict_same_epoch_incremental(&self, target_slot: u64) -> Result<()> { let slots_per_epoch = DEFAULT_SLOTS_PER_EPOCH; let target_epoch = target_slot / slots_per_epoch; @@ -184,7 +252,7 @@ impl BackupSnapshotMonitor { let mut same_epoch_snapshots: Vec = dir_entries .filter_map(Result::ok) .filter_map(|entry| SnapshotInfo::from_path(entry.path())) - .filter(|snap| snap.end_slot / slots_per_epoch == target_epoch) + .filter(|snap| snap.is_incremental() && snap.end_slot / slots_per_epoch == target_epoch) .collect(); // Sort by end_slot ascending so we can remove oldest @@ -257,9 +325,15 @@ impl BackupSnapshotMonitor { last_epoch_backup_path = this_epoch_backup_path; this_epoch_backup_path = None; let current_epoch = this_epoch_target_slot / DEFAULT_SLOTS_PER_EPOCH; - if let Err(e) = self.evict_all_epoch_snapshots(current_epoch - 2) { + if let Err(e) = self.evict_all_epoch_snapshots( + current_epoch - self.num_monitored_epochs.saturating_sub(1), + ) { log::error!("Failed to evict old snapshots: {}", e); } + // evict all saved files + if let Err(e) = self.evict_saved_files(current_epoch - self.num_monitored_epochs) { + log::error!("Failed to evict old TipRouter saved files: {}", e); + } } // Backup latest snapshot for last epoch and this epoch @@ -279,6 +353,10 @@ impl BackupSnapshotMonitor { mod tests { use std::fs::File; + use crate::{ + merkle_tree_collection_file_name, meta_merkle_tree_file_name, stake_meta_file_name, + }; + use super::*; use std::io::Write; use tempfile::TempDir; @@ -294,6 +372,8 @@ mod tests { temp_dir.path().to_path_buf(), backup_dir.path().to_path_buf(), None, + backup_dir.path().to_path_buf(), + 3, ); // The test version will use the fixed slot from cfg(test) get_target_slot @@ -311,10 +391,19 @@ mod tests { .join("incremental-snapshot-100-150-hash1.tar.zst"); let info = SnapshotInfo::from_path(path.clone()).unwrap(); - assert_eq!(info._start_slot, 100); + assert_eq!(info._start_slot.unwrap(), 100); assert_eq!(info.end_slot, 150); assert_eq!(info.path, path); + // Full snapshot + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("snapshot-323710005-hash.tar.zst"); + + let info = SnapshotInfo::from_path(path.clone()).unwrap(); + assert_eq!(info._start_slot, None); + assert_eq!(info.end_slot, 323710005); + assert_eq!(info.path, path); + // Test invalid cases assert!(SnapshotInfo::from_path(temp_dir.path().join("not-a-snapshot.txt")).is_none()); assert!( @@ -331,6 +420,8 @@ mod tests { temp_dir.path().to_path_buf(), temp_dir.path().to_path_buf(), None, + temp_dir.path().to_path_buf(), + 3, ); // Create test snapshot files @@ -374,6 +465,8 @@ mod tests { source_dir.path().to_path_buf(), backup_dir.path().to_path_buf(), None, + backup_dir.path().to_path_buf(), + 3, ); // Create test snapshot with some content @@ -412,6 +505,8 @@ mod tests { source_dir.path().to_path_buf(), backup_dir.path().to_path_buf(), None, + backup_dir.path().to_path_buf(), + 3, ); let missing_path = source_dir.path().join("nonexistent.tar.zst"); @@ -422,4 +517,87 @@ mod tests { .await .is_err()); } + + #[test] + fn test_evict_saved_files() { + let temp_dir = TempDir::new().unwrap(); + let monitor = BackupSnapshotMonitor::new( + "http://localhost:8899", + temp_dir.path().to_path_buf(), + temp_dir.path().to_path_buf(), + None, + temp_dir.path().to_path_buf(), + 3, + ); + let current_epoch = 749; + let first_epoch = current_epoch - 5; + + for i in first_epoch..current_epoch { + File::create(&monitor.save_path.join(stake_meta_file_name(i))).unwrap(); + File::create(&monitor.save_path.join(merkle_tree_collection_file_name(i))).unwrap(); + File::create(&monitor.save_path.join(meta_merkle_tree_file_name(i))).unwrap(); + } + let dir_entries: Vec = std::fs::read_dir(&monitor.save_path) + .unwrap() + .map(|x| x.unwrap().path()) + .collect(); + assert_eq!(dir_entries.len(), 5 * 3); + + monitor + .evict_saved_files(current_epoch - monitor.num_monitored_epochs) + .unwrap(); + let dir_entries: Vec = std::fs::read_dir(&monitor.save_path) + .unwrap() + .map(|x| x.unwrap().path()) + .collect(); + assert_eq!(dir_entries.len(), 6); + + // test not evicting some other similar file in the same directory + let file_path = monitor + .save_path + .join(format!("{first_epoch}_other_similar_file.json")); + let mut file = File::create(&file_path).unwrap(); + file.write_all(b"test").unwrap(); + monitor + .evict_saved_files(current_epoch - monitor.num_monitored_epochs) + .unwrap(); + assert!(File::open(file_path).is_ok()); + } + + #[test] + fn test_evict_same_epoch_incremental() { + let temp_dir = TempDir::new().unwrap(); + let monitor = BackupSnapshotMonitor::new( + "http://localhost:8899", + temp_dir.path().to_path_buf(), + temp_dir.path().to_path_buf(), + None, + temp_dir.path().to_path_buf(), + 3, + ); + + // Create test snapshot files + let snapshots = [ + "incremental-snapshot-100-324431477-hash1.tar.zst", + "incremental-snapshot-200-324431877-hash2.tar.zst", + "incremental-snapshot-300-324431977-hash3.tar.zst", + "incremental-snapshot-100-324589366-hash1.tar.zst", + "incremental-snapshot-200-324589866-hash2.tar.zst", + "incremental-snapshot-300-324590366-hash3.tar.zst", + "snapshot-324431977-hash.tar.zst", + ]; + + for name in snapshots.iter() { + let path = temp_dir.path().join(name); + File::create(path).unwrap(); + } + + // Test that it only keeps 3 incrementals when there's a full snapshot + monitor.evict_same_epoch_incremental(324431977).unwrap(); + let dir_entries: Vec = std::fs::read_dir(&monitor.backup_dir) + .unwrap() + .map(|x| x.unwrap().path()) + .collect(); + assert_eq!(dir_entries.len(), snapshots.len()); + } } diff --git a/tip-router-operator-cli/src/claim.rs b/tip-router-operator-cli/src/claim.rs index 5acc6970..116f4f67 100644 --- a/tip-router-operator-cli/src/claim.rs +++ b/tip-router-operator-cli/src/claim.rs @@ -32,6 +32,7 @@ use solana_sdk::{ use thiserror::Error; use crate::{ + merkle_tree_collection_file_name, rpc_utils::{get_batched_accounts, send_until_blockhash_expires}, Cli, }; @@ -77,10 +78,9 @@ pub async fn claim_mev_tips_with_emit( let keypair = read_keypair_file(cli.keypair_path.clone()) .map_err(|e| anyhow::anyhow!("Failed to read keypair file: {:?}", e))?; let keypair = Arc::new(keypair); - let meta_merkle_tree_dir = cli.meta_merkle_tree_dir.clone(); + let meta_merkle_tree_dir = cli.save_path.clone(); let rpc_url = cli.rpc_url.clone(); - let merkle_tree_coll_path = - meta_merkle_tree_dir.join(format!("generated_merkle_tree_{}.json", epoch)); + let merkle_tree_coll_path = meta_merkle_tree_dir.join(merkle_tree_collection_file_name(epoch)); let mut merkle_tree_coll = GeneratedMerkleTreeCollection::new_from_file(&merkle_tree_coll_path) .map_err(|e| anyhow::anyhow!(e))?; @@ -192,29 +192,35 @@ pub async fn claim_mev_tips( } all_claim_transactions.shuffle(&mut thread_rng()); - let transactions: Vec<_> = all_claim_transactions.into_iter().take(300).collect(); - // only check balance for the ones we need to currently send since reclaim rent running in parallel - if let Some((start_balance, desired_balance, sol_to_deposit)) = - is_sufficient_balance(&keypair.pubkey(), &rpc_client, transactions.len() as u64).await - { - return Err(ClaimMevError::InsufficientBalance { - desired_balance, - payer: keypair.pubkey(), - start_balance, - sol_to_deposit, - }); - } + for transactions in all_claim_transactions.chunks(2_000) { + let transactions: Vec<_> = transactions.to_vec(); + // only check balance for the ones we need to currently send since reclaim rent running in parallel + if let Some((start_balance, desired_balance, sol_to_deposit)) = + is_sufficient_balance(&keypair.pubkey(), &rpc_client, transactions.len() as u64) + .await + { + return Err(ClaimMevError::InsufficientBalance { + desired_balance, + payer: keypair.pubkey(), + start_balance, + sol_to_deposit, + }); + } - let blockhash = rpc_client.get_latest_blockhash().await?; - let _ = send_until_blockhash_expires( - &rpc_client, - &rpc_sender_client, - transactions, - blockhash, - keypair, - ) - .await; + let blockhash = rpc_client.get_latest_blockhash().await?; + if let Err(e) = send_until_blockhash_expires( + &rpc_client, + &rpc_sender_client, + transactions, + blockhash, + keypair, + ) + .await + { + info!("send_until_blockhash_expires failed: {:?}", e); + } + } } let transactions = get_claim_transactions_for_valid_unclaimed( diff --git a/tip-router-operator-cli/src/cli.rs b/tip-router-operator-cli/src/cli.rs index 735c2e40..c6af54f7 100644 --- a/tip-router-operator-cli/src/cli.rs +++ b/tip-router-operator-cli/src/cli.rs @@ -1,8 +1,11 @@ use std::path::PathBuf; use clap::Parser; +use log::info; use solana_sdk::pubkey::Pubkey; +use crate::OperatorState; + #[derive(Clone, Parser)] #[command(author, version, about)] pub struct Cli { @@ -27,9 +30,6 @@ pub struct Cli { #[arg(short, long, env)] pub snapshot_output_dir: PathBuf, - #[arg(short, long, env)] - pub meta_merkle_tree_dir: PathBuf, - #[arg(long, env, default_value = "false")] pub submit_as_memo: bool, @@ -37,10 +37,63 @@ pub struct Cli { #[arg(long, env, default_value_t = 1)] pub micro_lamports: u64, + #[arg( + long, + env, + alias = "meta-merkle-tree-dir", + help = "Path to save data (formerly meta-merkle-tree-dir)" + )] + pub save_path: PathBuf, + #[command(subcommand)] pub command: Commands, } +impl Cli { + pub fn create_save_path(&self) { + if !self.save_path.exists() { + info!( + "Creating Tip Router save directory at {}", + self.save_path.display() + ); + std::fs::create_dir_all(&self.save_path).unwrap(); + } + } + + pub fn get_snapshot_paths(&self) -> SnapshotPaths { + let ledger_path = self.ledger_path.clone(); + let account_paths = None; + let account_paths = account_paths.map_or_else(|| vec![ledger_path.clone()], |paths| paths); + let full_snapshots_path = self.full_snapshots_path.clone(); + let full_snapshots_path = full_snapshots_path.map_or(ledger_path.clone(), |path| path); + let incremental_snapshots_path = self.backup_snapshots_dir.clone(); + SnapshotPaths { + ledger_path, + account_paths, + full_snapshots_path, + incremental_snapshots_path, + backup_snapshots_dir: self.backup_snapshots_dir.clone(), + } + } + + pub fn force_different_backup_snapshot_dir(&self) { + let snapshot_paths = self.get_snapshot_paths(); + assert_ne!( + snapshot_paths.full_snapshots_path, + snapshot_paths.backup_snapshots_dir + ); + } +} + +pub struct SnapshotPaths { + pub ledger_path: PathBuf, + pub account_paths: Vec, + pub full_snapshots_path: PathBuf, + pub incremental_snapshots_path: PathBuf, + /// Used when storing or loading snapshots that the operator CLI is workign with + pub backup_snapshots_dir: PathBuf, +} + #[derive(clap::Subcommand, Clone)] pub enum Commands { Run { @@ -56,15 +109,9 @@ pub enum Commands { #[arg(long, env)] tip_router_program_id: Pubkey, - #[arg(long, env, default_value = "false")] - enable_snapshots: bool, - #[arg(long, env, default_value = "3")] num_monitored_epochs: u64, - #[arg(long, env, default_value = "false")] - start_next_epoch: bool, - #[arg(long, env)] override_target_slot: Option, @@ -73,8 +120,27 @@ pub enum Commands { #[arg(long, env, default_value = "false")] claim_tips: bool, + + #[arg(long, env, default_value = "wait-for-next-epoch")] + starting_stage: OperatorState, + + #[arg(long, env, default_value = "true")] + save_stages: bool, + + #[arg( + long, + env, + alias = "enable-snapshots", + help = "Flag to enable storing created snapshots (formerly enable-snapshots)", + default_value = "false" + )] + save_snapshot: bool, }, SnapshotSlot { + #[arg(long, env)] + slot: u64, + }, + SubmitEpoch { #[arg(short, long, env)] ncn_address: Pubkey, @@ -82,45 +148,63 @@ pub enum Commands { tip_distribution_program_id: Pubkey, #[arg(long, env)] - tip_payment_program_id: Pubkey, + tip_router_program_id: Pubkey, #[arg(long, env)] - tip_router_program_id: Pubkey, + epoch: u64, #[arg(long, env, default_value = "false")] - enable_snapshots: bool, + set_merkle_roots: bool, + }, + ClaimTips { + #[arg(long, env)] + tip_router_program_id: Pubkey, + /// Tip distribution program ID #[arg(long, env)] - slot: u64, - }, - SubmitEpoch { + tip_distribution_program_id: Pubkey, + #[arg(short, long, env)] ncn_address: Pubkey, + /// The epoch to Claim tips for #[arg(long, env)] - tip_distribution_program_id: Pubkey, - + epoch: u64, + }, + CreateStakeMeta { #[arg(long, env)] - tip_router_program_id: Pubkey, + slot: u64, #[arg(long, env)] epoch: u64, - }, - ClaimTips { - /// Tip distribution program ID + #[arg(long, env)] tip_distribution_program_id: Pubkey, - /// Tip router program ID #[arg(long, env)] - tip_router_program_id: Pubkey, + tip_payment_program_id: Pubkey, - /// NCN address + #[arg(long, env, default_value = "true")] + save: bool, + }, + CreateMerkleTreeCollection { #[arg(long, env)] + tip_router_program_id: Pubkey, + + #[arg(short, long, env)] ncn_address: Pubkey, - /// The epoch to Claim tips for #[arg(long, env)] epoch: u64, + + #[arg(long, env, default_value = "true")] + save: bool, + }, + CreateMetaMerkleTree { + #[arg(long, env)] + epoch: u64, + + #[arg(long, env, default_value = "true")] + save: bool, }, } diff --git a/tip-router-operator-cli/src/ledger_utils.rs b/tip-router-operator-cli/src/ledger_utils.rs index c2ac3c82..b1df306b 100644 --- a/tip-router-operator-cli/src/ledger_utils.rs +++ b/tip-router-operator-cli/src/ledger_utils.rs @@ -9,7 +9,9 @@ use std::{ use clap_old::ArgMatches; use log::{info, warn}; -use solana_accounts_db::hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}; +use solana_accounts_db::hardened_unpack::{ + open_genesis_config, OpenGenesisConfigError, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, +}; use solana_ledger::{ blockstore::{Blockstore, BlockstoreError}, blockstore_options::{AccessType, BlockstoreOptions}, @@ -17,30 +19,46 @@ use solana_ledger::{ }; use solana_metrics::{datapoint_error, datapoint_info}; use solana_runtime::{ - bank::Bank, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_bank_utils, - snapshot_config::SnapshotConfig, snapshot_utils::SnapshotVersion, + bank::Bank, + snapshot_archive_info::SnapshotArchiveInfoGetter, + snapshot_bank_utils, + snapshot_config::SnapshotConfig, + snapshot_utils::{self, get_full_snapshot_archives, SnapshotError, SnapshotVersion}, }; -use solana_sdk::{clock::Slot, pubkey::Pubkey}; +use solana_sdk::clock::Slot; +use thiserror::Error; use crate::{arg_matches, load_and_process_ledger}; +#[derive(Error, Debug)] +pub enum LedgerUtilsError { + #[error("BankFromSnapshot error: {0}")] + BankFromSnapshotError(#[from] SnapshotError), + #[error("Missing snapshot at slot {0}")] + MissingSnapshotAtSlot(u64), + #[error("BankFromSnapshot error: {0}")] + OpenGenesisConfigError(#[from] OpenGenesisConfigError), +} + // TODO: Use Result and propagate errors more gracefully /// Create the Bank for a desired slot for given file paths. +#[allow(clippy::cognitive_complexity, clippy::too_many_arguments)] pub fn get_bank_from_ledger( - operator_address: &Pubkey, + operator_address: String, ledger_path: &Path, account_paths: Vec, full_snapshots_path: PathBuf, incremental_snapshots_path: PathBuf, desired_slot: &Slot, - take_snapshot: bool, + save_snapshot: bool, + snapshot_save_path: PathBuf, ) -> Arc { let start_time = Instant::now(); // Start validation datapoint_info!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("state", "validate_path_start", String), ("step", 0, i64), ); @@ -49,7 +67,7 @@ pub fn get_bank_from_ledger( datapoint_info!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("state", "load_genesis_start", String), ("step", 1, i64), ("duration_ms", start_time.elapsed().as_millis() as i64, i64), @@ -60,7 +78,7 @@ pub fn get_bank_from_ledger( Err(e) => { datapoint_error!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("status", "error", String), ("state", "load_genesis", String), ("step", 1, i64), @@ -74,7 +92,7 @@ pub fn get_bank_from_ledger( datapoint_info!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("state", "load_blockstore_start", String), ("step", 2, i64), ("duration_ms", start_time.elapsed().as_millis() as i64, i64), @@ -119,7 +137,7 @@ pub fn get_bank_from_ledger( }; datapoint_error!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("status", "error", String), ("state", "load_blockstore", String), ("step", 2, i64), @@ -132,7 +150,7 @@ pub fn get_bank_from_ledger( let error_str = format!("Failed to open blockstore at {ledger_path:?}: {err:?}"); datapoint_error!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("status", "error", String), ("state", "load_blockstore", String), ("step", 2, i64), @@ -159,7 +177,7 @@ pub fn get_bank_from_ledger( datapoint_info!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("state", "load_snapshot_config_start", String), ("step", 3, i64), ("duration_ms", start_time.elapsed().as_millis() as i64, i64), @@ -176,6 +194,59 @@ pub fn get_bank_from_ledger( halt_at_slot: Some(desired_slot.to_owned()), ..Default::default() }; + + let mut starting_slot = 0; // default start check with genesis + if let Some(full_snapshot_slot) = snapshot_utils::get_highest_full_snapshot_archive_slot( + &full_snapshots_path, + process_options.halt_at_slot, + ) { + let incremental_snapshot_slot = + snapshot_utils::get_highest_incremental_snapshot_archive_slot( + &incremental_snapshots_path, + full_snapshot_slot, + process_options.halt_at_slot, + ) + .unwrap_or_default(); + starting_slot = std::cmp::max(full_snapshot_slot, incremental_snapshot_slot); + } + info!("Starting slot {}", starting_slot); + + match process_options.halt_at_slot { + // Skip the following checks for sentinel values of Some(0) and None. + // For Some(0), no slots will be be replayed after starting_slot. + // For None, all available children of starting_slot will be replayed. + None | Some(0) => {} + Some(halt_slot) => { + if halt_slot < starting_slot { + let error_str = String::from("halt_slot < starting_slot"); + datapoint_error!( + "tip_router_cli.get_bank", + ("operator", operator_address, String), + ("status", "error", String), + ("state", "load_blockstore", String), + ("step", 2, i64), + ("error", error_str, String), + ("duration_ms", start_time.elapsed().as_millis() as i64, i64), + ); + panic!("{}", error_str); + } + // Check if we have the slot data necessary to replay from starting_slot to >= halt_slot. + if !blockstore.slot_range_connected(starting_slot, halt_slot) { + let error_str = + format!("Blockstore missing data to replay to slot {}", desired_slot); + datapoint_error!( + "tip_router_cli.get_bank", + ("operator", operator_address, String), + ("status", "error", String), + ("state", "load_blockstore", String), + ("step", 2, i64), + ("error", error_str, String), + ("duration_ms", start_time.elapsed().as_millis() as i64, i64), + ); + panic!("{}", error_str); + } + } + } let exit = Arc::new(AtomicBool::new(false)); let mut arg_matches = ArgMatches::new(); @@ -195,13 +266,13 @@ pub fn get_bank_from_ledger( process_options, Some(full_snapshots_path), Some(incremental_snapshots_path), - operator_address, + operator_address.clone(), ) { Ok(res) => res, Err(e) => { datapoint_error!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("state", "load_bank_forks", String), ("status", "error", String), ("step", 4, i64), @@ -282,7 +353,7 @@ pub fn get_bank_from_ledger( datapoint_info!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("state", "bank_to_full_snapshot_archive_start", String), ("bank_hash", working_bank.hash().to_string(), String), ("step", 5, i64), @@ -291,12 +362,14 @@ pub fn get_bank_from_ledger( exit.store(true, Ordering::Relaxed); - if take_snapshot { + if save_snapshot { let full_snapshot_archive_info = match snapshot_bank_utils::bank_to_full_snapshot_archive( ledger_path, &working_bank, Some(SnapshotVersion::default()), - snapshot_config.full_snapshot_archives_dir, + // Use the snapshot_save_path path so the snapshot is stored in a directory different + // than the node's primary snapshot directory + snapshot_save_path, snapshot_config.incremental_snapshot_archives_dir, snapshot_config.archive_format, ) { @@ -304,7 +377,7 @@ pub fn get_bank_from_ledger( Err(e) => { datapoint_error!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("status", "error", String), ("state", "bank_to_full_snapshot_archive", String), ("step", 6, i64), @@ -334,7 +407,7 @@ pub fn get_bank_from_ledger( datapoint_info!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("state", "get_bank_from_ledger_success", String), ("step", 6, i64), ("duration_ms", start_time.elapsed().as_millis() as i64, i64), @@ -342,12 +415,97 @@ pub fn get_bank_from_ledger( working_bank } +/// Loads the bank from the snapshot at the exact slot. If the snapshot doesn't exist, result is +/// an error. +pub fn get_bank_from_snapshot_at_slot( + snapshot_slot: u64, + full_snapshots_path: &PathBuf, + bank_snapshots_dir: &PathBuf, + account_paths: Vec, + ledger_path: &Path, +) -> Result { + let mut full_snapshot_archives = get_full_snapshot_archives(full_snapshots_path); + full_snapshot_archives.retain(|archive| archive.snapshot_archive_info().slot == snapshot_slot); + + if full_snapshot_archives.len() != 1 { + return Err(LedgerUtilsError::MissingSnapshotAtSlot(snapshot_slot)); + } + let full_snapshot_archive_info = full_snapshot_archives.first().expect("unreachable"); + let process_options = ProcessOptions { + halt_at_slot: Some(snapshot_slot.to_owned()), + ..Default::default() + }; + let genesis_config = match open_genesis_config(ledger_path, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE) { + Ok(genesis_config) => genesis_config, + Err(e) => return Err(e.into()), + }; + let exit = Arc::new(AtomicBool::new(false)); + + let (bank, _) = snapshot_bank_utils::bank_from_snapshot_archives( + &account_paths, + bank_snapshots_dir, + full_snapshot_archive_info, + None, + &genesis_config, + &process_options.runtime_config, + process_options.debug_keys.clone(), + None, + process_options.limit_load_slot_count_from_snapshot, + process_options.accounts_db_test_hash_calculation, + process_options.accounts_db_skip_shrink, + process_options.accounts_db_force_initial_clean, + process_options.verify_index, + process_options.accounts_db_config.clone(), + None, + exit.clone(), + )?; + exit.store(true, Ordering::Relaxed); + Ok(bank) +} + #[cfg(test)] mod tests { use crate::load_and_process_ledger::LEDGER_TOOL_DIRECTORY; + use solana_sdk::pubkey::Pubkey; + use super::*; + #[test] + fn test_get_bank_from_snapshot_at_slot() { + let ledger_path = PathBuf::from("./tests/fixtures/test-ledger"); + let account_paths = vec![ledger_path.join("accounts/run")]; + let full_snapshots_path = ledger_path.clone(); + let snapshot_slot = 100; + let bank = get_bank_from_snapshot_at_slot( + snapshot_slot, + &full_snapshots_path, + &full_snapshots_path, + account_paths, + &ledger_path.as_path(), + ) + .unwrap(); + assert_eq!(bank.slot(), snapshot_slot); + } + + #[test] + fn test_get_bank_from_snapshot_at_slot_snapshot_missing_error() { + let ledger_path = PathBuf::from("./tests/fixtures/test-ledger"); + let account_paths = vec![ledger_path.join("accounts/run")]; + let full_snapshots_path = ledger_path.clone(); + let snapshot_slot = 105; + let res = get_bank_from_snapshot_at_slot( + snapshot_slot, + &full_snapshots_path, + &full_snapshots_path, + account_paths, + &ledger_path.as_path(), + ); + assert!(res.is_err()); + let expected_err_str = format!("Missing snapshot at slot {}", snapshot_slot); + assert_eq!(res.err().unwrap().to_string(), expected_err_str); + } + #[test] fn test_get_bank_from_ledger_success() { let operator_address = Pubkey::new_unique(); @@ -356,13 +514,14 @@ mod tests { let full_snapshots_path = ledger_path.clone(); let desired_slot = 144; let res = get_bank_from_ledger( - &operator_address, + operator_address.to_string(), &ledger_path, account_paths, full_snapshots_path.clone(), full_snapshots_path.clone(), &desired_slot, true, + full_snapshots_path.clone(), ); assert_eq!(res.slot(), desired_slot); // Assert that the snapshot was created diff --git a/tip-router-operator-cli/src/lib.rs b/tip-router-operator-cli/src/lib.rs index 98786f65..965375cb 100644 --- a/tip-router-operator-cli/src/lib.rs +++ b/tip-router-operator-cli/src/lib.rs @@ -3,203 +3,181 @@ pub mod ledger_utils; pub mod stake_meta_generator; pub mod tip_router; pub use crate::cli::{Cli, Commands}; -pub mod claim; -pub mod cli; -pub use crate::process_epoch::process_epoch; pub mod arg_matches; pub mod backup_snapshots; +pub mod claim; +pub mod cli; pub mod load_and_process_ledger; pub mod process_epoch; pub mod rpc_utils; pub mod submit; -use std::fs::{self, File}; -use std::io::{BufWriter, Write}; +use std::fs; use std::path::{Path, PathBuf}; use std::process::Command; +use std::sync::Arc; use std::time::Instant; use anchor_lang::prelude::*; +use cli::SnapshotPaths; use jito_tip_distribution_sdk::TipDistributionAccount; use jito_tip_payment_sdk::{ CONFIG_ACCOUNT_SEED, TIP_ACCOUNT_SEED_0, TIP_ACCOUNT_SEED_1, TIP_ACCOUNT_SEED_2, TIP_ACCOUNT_SEED_3, TIP_ACCOUNT_SEED_4, TIP_ACCOUNT_SEED_5, TIP_ACCOUNT_SEED_6, TIP_ACCOUNT_SEED_7, }; -use log::{error, info}; -use meta_merkle_tree::generated_merkle_tree::MerkleRootGeneratorError; +use ledger_utils::get_bank_from_ledger; +use log::info; +use meta_merkle_tree::generated_merkle_tree::StakeMetaCollection; use meta_merkle_tree::{ generated_merkle_tree::GeneratedMerkleTreeCollection, meta_merkle_tree::MetaMerkleTree, }; use solana_metrics::{datapoint_error, datapoint_info}; -use solana_sdk::{account::AccountSharedData, pubkey::Pubkey, slot_history::Slot}; - -#[derive(Debug)] -pub enum MerkleRootError { - StakeMetaGeneratorError(String), - MerkleRootGeneratorError(String), - MerkleTreeError(String), +use solana_runtime::bank::Bank; +use solana_sdk::{account::AccountSharedData, pubkey::Pubkey}; +use stake_meta_generator::generate_stake_meta_collection; + +#[derive(clap::ValueEnum, Clone, Copy, Debug)] +pub enum OperatorState { + // Allows the operator to load from a snapshot created externally + LoadBankFromSnapshot, + CreateStakeMeta, + CreateMerkleTreeCollection, + CreateMetaMerkleTree, + CastVote, + WaitForNextEpoch, } -// TODO where did these come from? -pub struct TipPaymentPubkeys { - _config_pda: Pubkey, - tip_pdas: Vec, +pub fn stake_meta_file_name(epoch: u64) -> String { + format!("{}_stake_meta_collection.json", epoch) } -#[derive(Clone, Debug, AnchorSerialize, AnchorDeserialize)] -pub struct TipAccountConfig { - pub authority: Pubkey, - pub protocol_fee_bps: u64, - pub bump: u8, +pub fn merkle_tree_collection_file_name(epoch: u64) -> String { + format!("{}_merkle_tree_collection.json", epoch) } -fn derive_tip_payment_pubkeys(program_id: &Pubkey) -> TipPaymentPubkeys { - let config_pda = Pubkey::find_program_address(&[CONFIG_ACCOUNT_SEED], program_id).0; - let tip_pda_0 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_0], program_id).0; - let tip_pda_1 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_1], program_id).0; - let tip_pda_2 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_2], program_id).0; - let tip_pda_3 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_3], program_id).0; - let tip_pda_4 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_4], program_id).0; - let tip_pda_5 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_5], program_id).0; - let tip_pda_6 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_6], program_id).0; - let tip_pda_7 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_7], program_id).0; - - TipPaymentPubkeys { - _config_pda: config_pda, - tip_pdas: vec![ - tip_pda_0, tip_pda_1, tip_pda_2, tip_pda_3, tip_pda_4, tip_pda_5, tip_pda_6, tip_pda_7, - ], - } +pub fn meta_merkle_tree_file_name(epoch: u64) -> String { + format!("{}_meta_merkle_tree.json", epoch) } -fn write_to_json_file( - merkle_tree_coll: &GeneratedMerkleTreeCollection, - file_path: &PathBuf, -) -> std::result::Result<(), MerkleRootGeneratorError> { - let file = File::create(file_path)?; - let mut writer = BufWriter::new(file); - let json = serde_json::to_string_pretty(&merkle_tree_coll).unwrap(); - writer.write_all(json.as_bytes())?; - writer.flush()?; - - Ok(()) -} +// STAGE 1 LoadBankFromSnapshot +pub fn load_bank_from_snapshot(cli: Cli, slot: u64, save_snapshot: bool) -> Arc { + let SnapshotPaths { + ledger_path, + account_paths, + full_snapshots_path, + incremental_snapshots_path: _, + backup_snapshots_dir, + } = cli.get_snapshot_paths(); -/// Convenience wrapper around [TipDistributionAccount] -pub struct TipDistributionAccountWrapper { - pub tip_distribution_account: TipDistributionAccount, - pub account_data: AccountSharedData, - pub tip_distribution_pubkey: Pubkey, + get_bank_from_ledger( + cli.operator_address, + &ledger_path, + account_paths, + full_snapshots_path, + backup_snapshots_dir.clone(), + &slot, + save_snapshot, + backup_snapshots_dir, + ) } -#[allow(clippy::too_many_arguments)] -pub fn get_meta_merkle_root( - ledger_path: &Path, - account_paths: Vec, - full_snapshots_path: PathBuf, - incremental_snapshots_path: PathBuf, - desired_slot: &Slot, +// STAGE 2 CreateStakeMeta +pub fn create_stake_meta( + operator_address: String, + epoch: u64, + bank: &Arc, tip_distribution_program_id: &Pubkey, - out_path: &str, tip_payment_program_id: &Pubkey, - tip_router_program_id: &Pubkey, - ncn_address: &Pubkey, - operator_address: &Pubkey, - epoch: u64, - protocol_fee_bps: u64, - snapshots_enabled: bool, - meta_merkle_tree_dir: &Path, -) -> std::result::Result { + save_path: &Path, + save: bool, +) -> StakeMetaCollection { let start = Instant::now(); - datapoint_info!( - "tip_router_cli.get_meta_merkle_root", - ("operator_address", operator_address.to_string(), String), - ("state", "stake_meta_generation", String), - ("step", 1, i64), - ("epoch", epoch, i64), - ("duration_ms", start.elapsed().as_millis() as i64, i64) - ); - - // cleanup tmp files - update with path where stake meta is written - match cleanup_tmp_files(&incremental_snapshots_path) { - Ok(_) => {} + info!("Generating stake_meta_collection object..."); + let stake_meta_coll = match generate_stake_meta_collection( + bank, + tip_distribution_program_id, + tip_payment_program_id, + ) { + Ok(stake_meta) => stake_meta, Err(e) => { - datapoint_info!( - "tip_router_cli.get_meta_merkle_root", - ("operator_address", operator_address.to_string(), String), - ("state", "cleanup_tmp_files", String), - ("error", format!("{:?}", e), String), + let error_str = format!("{:?}", e); + datapoint_error!( + "tip_router_cli.process_epoch", + ("operator_address", operator_address, String), ("epoch", epoch, i64), + ("status", "error", String), + ("error", error_str, String), + ("state", "stake_meta_generation", String), ("duration_ms", start.elapsed().as_millis() as i64, i64) ); + panic!("{}", error_str); } - } - - // Get stake meta collection - let stake_meta_collection = stake_meta_generator::generate_stake_meta( - operator_address, - ledger_path, - account_paths, - full_snapshots_path, - incremental_snapshots_path.clone(), - desired_slot, - tip_distribution_program_id, - out_path, - tip_payment_program_id, - snapshots_enabled, - ) - .map_err(|e| { - MerkleRootError::StakeMetaGeneratorError(format!("Failed to generate stake meta: {:?}", e)) - })?; + }; info!( "Created StakeMetaCollection:\n - epoch: {:?}\n - slot: {:?}\n - num stake metas: {:?}\n - bank_hash: {:?}", - stake_meta_collection.epoch, - stake_meta_collection.slot, - stake_meta_collection.stake_metas.len(), - stake_meta_collection.bank_hash + stake_meta_coll.epoch, + stake_meta_coll.slot, + stake_meta_coll.stake_metas.len(), + stake_meta_coll.bank_hash ); + if save { + // Note: We have the epoch come before the file name so ordering is neat on a machine + // with multiple epochs saved. + let file = save_path.join(stake_meta_file_name(epoch)); + stake_meta_coll.write_to_file(&file); + } datapoint_info!( "tip_router_cli.get_meta_merkle_root", - ("operator_address", operator_address.to_string(), String), - ("state", "generated_merkle_tree_collection", String), + ("operator_address", operator_address, String), + ("state", "create_stake_meta", String), ("step", 2, i64), - ("epoch", epoch, i64), + ("epoch", stake_meta_coll.epoch, i64), ("duration_ms", start.elapsed().as_millis() as i64, i64) ); + stake_meta_coll +} - // Cleanup tmp files - match cleanup_tmp_files(&incremental_snapshots_path) { - Ok(_) => {} - Err(e) => { - datapoint_info!( - "tip_router_cli.get_meta_merkle_root", - ("operator_address", operator_address.to_string(), String), - ("state", "cleanup_tmp_files", String), - ("error", format!("{:?}", e), String), - ("epoch", epoch, i64), - ("duration_ms", start.elapsed().as_millis() as i64, i64) - ); - } - } +// STAGE 3 CreateMerkleTreeCollection +#[allow(clippy::too_many_arguments)] +pub fn create_merkle_tree_collection( + operator_address: String, + tip_router_program_id: &Pubkey, + stake_meta_collection: StakeMetaCollection, + epoch: u64, + ncn_address: &Pubkey, + protocol_fee_bps: u64, + save_path: &Path, + save: bool, +) -> GeneratedMerkleTreeCollection { + let start = Instant::now(); // Generate merkle tree collection - let merkle_tree_coll = GeneratedMerkleTreeCollection::new_from_stake_meta_collection( + let merkle_tree_coll = match GeneratedMerkleTreeCollection::new_from_stake_meta_collection( stake_meta_collection, ncn_address, epoch, protocol_fee_bps, tip_router_program_id, - ) - .map_err(|_| { - MerkleRootError::MerkleRootGeneratorError( - "Failed to generate merkle tree collection".to_string(), - ) - })?; - + ) { + Ok(merkle_tree_coll) => merkle_tree_coll, + Err(e) => { + let error_str = format!("{:?}", e); + datapoint_error!( + "tip_router_cli.create_merkle_tree_collection", + ("operator_address", operator_address, String), + ("epoch", epoch, i64), + ("status", "error", String), + ("error", error_str, String), + ("state", "merkle_tree_generation", String), + ("duration_ms", start.elapsed().as_millis() as i64, i64) + ); + panic!("{}", error_str); + } + }; info!( "Created GeneratedMerkleTreeCollection:\n - epoch: {:?}\n - slot: {:?}\n - num generated merkle trees: {:?}\n - bank_hash: {:?}", merkle_tree_coll.epoch, @@ -208,99 +186,157 @@ pub fn get_meta_merkle_root( merkle_tree_coll.bank_hash ); - // Write GeneratedMerkleTreeCollection to file for debugging/verification - let generated_merkle_tree_path = incremental_snapshots_path.join(format!( - "generated_merkle_tree_{}.json", - merkle_tree_coll.epoch - )); - match write_to_json_file(&merkle_tree_coll, &generated_merkle_tree_path) { - Ok(_) => { - info!( - "Wrote GeneratedMerkleTreeCollection to {}", - generated_merkle_tree_path.display() - ); - } - Err(e) => { - error!( - "Failed to write GeneratedMerkleTreeCollection to file {}: {:?}", - generated_merkle_tree_path.display(), - e - ); - } - } - datapoint_info!( - "tip_router_cli.get_meta_merkle_root", - ("operator_address", operator_address.to_string(), String), + "tip_router_cli.create_merkle_tree_collection", + ("operator_address", operator_address, String), ("state", "meta_merkle_tree_creation", String), ("step", 3, i64), ("epoch", epoch, i64), ("duration_ms", start.elapsed().as_millis() as i64, i64) ); - // TODO: Hide this behind a flag when the process gets split up into the various stages and - // checkpoints. - - // Write GeneratedMerkleTreeCollection to disk. Required for Claiming - let merkle_tree_coll_path = - meta_merkle_tree_dir.join(format!("generated_merkle_tree_{}.json", epoch)); - let generated_merkle_tree_col_json = match serde_json::to_string(&merkle_tree_coll) { - Ok(json) => json, - Err(e) => { - datapoint_error!( - "tip_router_cli.process_epoch", - ("operator_address", operator_address.to_string(), String), - ("epoch", epoch, i64), - ("status", "error", String), - ("error", format!("{:?}", e), String), - ("state", "merkle_root_serialization", String), - ("duration_ms", start.elapsed().as_millis() as i64, i64) - ); - return Err(MerkleRootError::MerkleRootGeneratorError( - "Failed to serialize merkle tree collection".to_string(), - )); + if save { + // Note: We have the epoch come before the file name so ordering is neat on a machine + // with multiple epochs saved. + let file = save_path.join(merkle_tree_collection_file_name(epoch)); + match merkle_tree_coll.write_to_file(&file) { + Ok(_) => {} + Err(e) => { + let error_str = format!("{:?}", e); + datapoint_error!( + "tip_router_cli.create_merkle_tree_collection", + ("operator_address", operator_address, String), + ("epoch", epoch, i64), + ("status", "error", String), + ("error", error_str, String), + ("state", "merkle_root_file_write", String), + ("duration_ms", start.elapsed().as_millis() as i64, i64) + ); + panic!("{:?}", e); + } } - }; - - if let Err(e) = std::fs::write(merkle_tree_coll_path, generated_merkle_tree_col_json) { - datapoint_error!( - "tip_router_cli.process_epoch", - ("operator_address", operator_address.to_string(), String), - ("epoch", epoch, i64), - ("status", "error", String), - ("error", format!("{:?}", e), String), - ("state", "merkle_root_file_write", String), - ("duration_ms", start.elapsed().as_millis() as i64, i64) - ); - // TODO: propogate error - return Err(MerkleRootError::MerkleRootGeneratorError( - "Failed to write meta merkle tree to file".to_string(), - )); } + datapoint_info!( + "tip_router_cli.create_merkle_tree_collection", + ("operator_address", operator_address, String), + ("state", "meta_merkle_tree_creation", String), + ("step", 3, i64), + ("epoch", epoch, i64), + ("duration_ms", start.elapsed().as_millis() as i64, i64) + ); + merkle_tree_coll +} - // Convert to MetaMerkleTree - let meta_merkle_tree = MetaMerkleTree::new_from_generated_merkle_tree_collection( - merkle_tree_coll, - ) - .map_err(|e| { - MerkleRootError::MerkleTreeError(format!("Failed to create meta merkle tree: {:?}", e)) - })?; +// STAGE 4 CreateMetaMerkleTree +pub fn create_meta_merkle_tree( + operator_address: String, + merkle_tree_collection: GeneratedMerkleTreeCollection, + epoch: u64, + save_path: &Path, + save: bool, +) -> MetaMerkleTree { + let start = Instant::now(); + let meta_merkle_tree = + match MetaMerkleTree::new_from_generated_merkle_tree_collection(merkle_tree_collection) { + Ok(meta_merkle_tree) => meta_merkle_tree, + Err(e) => { + let error_str = format!("{:?}", e); + datapoint_error!( + "tip_router_cli.create_meta_merkle_tree", + ("operator_address", operator_address, String), + ("epoch", epoch, i64), + ("status", "error", String), + ("error", error_str, String), + ("state", "merkle_tree_generation", String), + ("duration_ms", start.elapsed().as_millis() as i64, i64) + ); + panic!("{}", error_str); + } + }; info!( "Created MetaMerkleTree:\n - num nodes: {:?}\n - merkle root: {:?}", meta_merkle_tree.num_nodes, meta_merkle_tree.merkle_root ); + if save { + // Note: We have the epoch come before the file name so ordering is neat on a machine + // with multiple epochs saved. + let file = save_path.join(meta_merkle_tree_file_name(epoch)); + match meta_merkle_tree.write_to_file(&file) { + Ok(_) => {} + Err(e) => { + let error_str = format!("{:?}", e); + datapoint_error!( + "tip_router_cli.create_meta_merkle_tree", + ("operator_address", operator_address, String), + ("epoch", epoch, i64), + ("status", "error", String), + ("error", error_str, String), + ("state", "merkle_root_file_write", String), + ("duration_ms", start.elapsed().as_millis() as i64, i64) + ); + panic!("{:?}", e); + } + } + } + datapoint_info!( - "tip_router_cli.get_meta_merkle_root", - ("operator_address", operator_address.to_string(), String), + "tip_router_cli.create_meta_merkle_tree", + ("operator_address", operator_address, String), ("state", "meta_merkle_tree_creation", String), ("step", 4, i64), ("epoch", epoch, i64), ("duration_ms", start.elapsed().as_millis() as i64, i64) ); - Ok(meta_merkle_tree) + meta_merkle_tree +} + +#[derive(Debug)] +pub enum MerkleRootError { + StakeMetaGeneratorError(String), + MerkleRootGeneratorError(String), + MerkleTreeError(String), +} + +// TODO where did these come from? +pub struct TipPaymentPubkeys { + _config_pda: Pubkey, + tip_pdas: Vec, +} + +#[derive(Clone, Debug, AnchorSerialize, AnchorDeserialize)] +pub struct TipAccountConfig { + pub authority: Pubkey, + pub protocol_fee_bps: u64, + pub bump: u8, +} + +fn derive_tip_payment_pubkeys(program_id: &Pubkey) -> TipPaymentPubkeys { + let config_pda = Pubkey::find_program_address(&[CONFIG_ACCOUNT_SEED], program_id).0; + let tip_pda_0 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_0], program_id).0; + let tip_pda_1 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_1], program_id).0; + let tip_pda_2 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_2], program_id).0; + let tip_pda_3 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_3], program_id).0; + let tip_pda_4 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_4], program_id).0; + let tip_pda_5 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_5], program_id).0; + let tip_pda_6 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_6], program_id).0; + let tip_pda_7 = Pubkey::find_program_address(&[TIP_ACCOUNT_SEED_7], program_id).0; + + TipPaymentPubkeys { + _config_pda: config_pda, + tip_pdas: vec![ + tip_pda_0, tip_pda_1, tip_pda_2, tip_pda_3, tip_pda_4, tip_pda_5, tip_pda_6, tip_pda_7, + ], + } +} + +/// Convenience wrapper around [TipDistributionAccount] +pub struct TipDistributionAccountWrapper { + pub tip_distribution_account: TipDistributionAccount, + pub account_data: AccountSharedData, + pub tip_distribution_pubkey: Pubkey, } fn get_validator_cmdline() -> Result { diff --git a/tip-router-operator-cli/src/load_and_process_ledger.rs b/tip-router-operator-cli/src/load_and_process_ledger.rs index 71fd293f..83b84868 100644 --- a/tip-router-operator-cli/src/load_and_process_ledger.rs +++ b/tip-router-operator-cli/src/load_and_process_ledger.rs @@ -125,7 +125,7 @@ pub fn load_and_process_ledger( process_options: ProcessOptions, snapshot_archive_path: Option, incremental_snapshot_archive_path: Option, - operator_address: &Pubkey, + operator_address: String, ) -> Result<(Arc>, Option), LoadAndProcessLedgerError> { let bank_snapshots_dir = if blockstore.is_primary_access() { blockstore.ledger_path().join("snapshot") @@ -401,7 +401,7 @@ pub fn load_and_process_ledger( datapoint_info!( "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), + ("operator", operator_address, String), ("state", "process_blockstore_from_root_start", String), ("step", 4, i64), ); diff --git a/tip-router-operator-cli/src/main.rs b/tip-router-operator-cli/src/main.rs index 9a6b7019..4f232620 100644 --- a/tip-router-operator-cli/src/main.rs +++ b/tip-router-operator-cli/src/main.rs @@ -4,18 +4,21 @@ use ::{ clap::Parser, ellipsis_client::EllipsisClient, log::{error, info}, + meta_merkle_tree::generated_merkle_tree::{GeneratedMerkleTreeCollection, StakeMetaCollection}, solana_metrics::set_host_id, solana_rpc_client::nonblocking::rpc_client::RpcClient, - solana_sdk::{ - clock::DEFAULT_SLOTS_PER_EPOCH, pubkey::Pubkey, signer::keypair::read_keypair_file, - }, - std::{path::PathBuf, str::FromStr, sync::Arc, time::Duration}, + solana_sdk::{pubkey::Pubkey, signer::keypair::read_keypair_file}, + std::{str::FromStr, sync::Arc, time::Duration}, tip_router_operator_cli::{ backup_snapshots::BackupSnapshotMonitor, claim::claim_mev_tips_with_emit, - cli::{Cli, Commands}, - process_epoch::{get_previous_epoch_last_slot, process_epoch, wait_for_next_epoch}, + cli::{Cli, Commands, SnapshotPaths}, + create_merkle_tree_collection, create_meta_merkle_tree, create_stake_meta, + ledger_utils::get_bank_from_snapshot_at_slot, + load_bank_from_snapshot, merkle_tree_collection_file_name, meta_merkle_tree_file_name, + process_epoch, stake_meta_file_name, submit::{submit_recent_epochs_to_ncn, submit_to_ncn}, + tip_router::get_ncn_config, }, tokio::time::sleep, }; @@ -24,6 +27,10 @@ use ::{ async fn main() -> Result<()> { env_logger::init(); let cli = Cli::parse(); + + // Ensure backup directory and + cli.force_different_backup_snapshot_dir(); + let keypair = read_keypair_file(&cli.keypair_path).expect("Failed to read keypair file"); let rpc_client = EllipsisClient::from_rpc_with_timeout( RpcClient::new(cli.rpc_url.clone()), @@ -33,13 +40,6 @@ async fn main() -> Result<()> { set_host_id(cli.operator_address.to_string()); - // Ensure tx submission works - // let test_meta_merkle_root = [1; 32]; - // let ix = spl_memo::build_memo(&test_meta_merkle_root.to_vec(), &[&keypair.pubkey()]); - // info!("Submitting test tx {:?}", test_meta_merkle_root); - // let tx = Transaction::new_with_payer(&[ix], Some(&keypair.pubkey())); - // rpc_client.process_transaction(tx, &[&keypair]).await?; - info!( "CLI Arguments: keypair_path: {} @@ -58,19 +58,27 @@ async fn main() -> Result<()> { cli.backup_snapshots_dir.display() ); + cli.create_save_path(); + match cli.command { Commands::Run { ncn_address, tip_distribution_program_id, tip_payment_program_id, tip_router_program_id, - enable_snapshots, + save_snapshot, num_monitored_epochs, - start_next_epoch, override_target_slot, + starting_stage, + save_stages, set_merkle_roots, claim_tips, } => { + assert!( + num_monitored_epochs > 0, + "num-monitored-epochs must be greater than 0" + ); + info!("Running Tip Router..."); info!("NCN Address: {}", ncn_address); info!( @@ -79,18 +87,17 @@ async fn main() -> Result<()> { ); info!("Tip Payment Program ID: {}", tip_payment_program_id); info!("Tip Router Program ID: {}", tip_router_program_id); - info!("Enable Snapshots: {}", enable_snapshots); + info!("Save Snapshots: {}", save_snapshot); info!("Num Monitored Epochs: {}", num_monitored_epochs); - info!("Start Next Epoch: {}", start_next_epoch); info!("Override Target Slot: {:?}", override_target_slot); info!("Submit as Memo: {}", cli.submit_as_memo); + info!("starting stage: {:?}", starting_stage); let rpc_client_clone = rpc_client.clone(); let full_snapshots_path = cli.full_snapshots_path.clone().unwrap(); let backup_snapshots_dir = cli.backup_snapshots_dir.clone(); let rpc_url = cli.rpc_url.clone(); - let cli_clone = cli.clone(); - let mut current_epoch = rpc_client.get_epoch_info().await?.epoch; + let cli_clone: Cli = cli.clone(); if !backup_snapshots_dir.exists() { info!( @@ -122,14 +129,18 @@ async fn main() -> Result<()> { } }); + let cli_clone: Cli = cli.clone(); // Track incremental snapshots and backup to `backup_snapshots_dir` tokio::spawn(async move { + let save_path = cli_clone.save_path; loop { if let Err(e) = BackupSnapshotMonitor::new( &rpc_url, full_snapshots_path.clone(), backup_snapshots_dir.clone(), override_target_slot, + save_path.clone(), + num_monitored_epochs, ) .run() .await @@ -146,7 +157,6 @@ async fn main() -> Result<()> { tokio::spawn(async move { loop { // Slow process with lots of account fetches so run every 30 minutes - sleep(Duration::from_secs(1800)).await; let epoch = if let Ok(epoch) = rpc_client.get_epoch_info().await { epoch.epoch.checked_sub(1).unwrap_or(epoch.epoch) } else { @@ -164,98 +174,39 @@ async fn main() -> Result<()> { { error!("Error claiming tips: {}", e); } + sleep(Duration::from_secs(1800)).await; } }); } - if start_next_epoch { - current_epoch = wait_for_next_epoch(&rpc_client, current_epoch).await; - } - - // Track runs that are starting right at the beginning of a new epoch - let mut new_epoch_rollover = start_next_epoch; - - loop { - // Get the last slot of the previous epoch - let (previous_epoch, previous_epoch_slot) = - if let Ok((epoch, slot)) = get_previous_epoch_last_slot(&rpc_client).await { - (epoch, slot) - } else { - error!("Error getting previous epoch slot"); - continue; - }; - - info!("Processing slot {} for previous epoch", previous_epoch_slot); - - // Process the epoch - match process_epoch( - &rpc_client, - previous_epoch_slot, - previous_epoch, - &tip_distribution_program_id, - &tip_payment_program_id, - &tip_router_program_id, - &ncn_address, - enable_snapshots, - new_epoch_rollover, - &cli, - ) - .await - { - Ok(_) => info!("Successfully processed epoch"), - Err(e) => { - error!("Error processing epoch: {}", e); - } - } - - // Wait for epoch change - current_epoch = wait_for_next_epoch(rpc_client.as_ref(), current_epoch).await; - - new_epoch_rollover = true; - } - } - Commands::SnapshotSlot { - ncn_address, - tip_distribution_program_id, - tip_payment_program_id, - tip_router_program_id, - enable_snapshots, - slot, - } => { - info!("Snapshotting slot..."); - let epoch = slot / DEFAULT_SLOTS_PER_EPOCH; - // Process the epoch - match process_epoch( - &rpc_client, - slot, - epoch, + // Endless loop that transitions between stages of the operator process. + process_epoch::loop_stages( + rpc_client, + cli, + starting_stage, + override_target_slot, + &tip_router_program_id, &tip_distribution_program_id, &tip_payment_program_id, - &tip_router_program_id, &ncn_address, - enable_snapshots, - false, - &cli, + save_snapshot, + save_stages, ) - .await - { - Ok(_) => info!("Successfully processed slot"), - Err(e) => { - error!("Error processing epoch: {}", e); - } - } + .await?; + } + Commands::SnapshotSlot { slot } => { + info!("Snapshotting slot..."); + + load_bank_from_snapshot(cli, slot, true); } Commands::SubmitEpoch { ncn_address, tip_distribution_program_id, tip_router_program_id, epoch, + set_merkle_roots, } => { - let meta_merkle_tree_path = PathBuf::from(format!( - "{}/meta_merkle_tree_{}.json", - cli.meta_merkle_tree_dir.display(), - epoch - )); + let meta_merkle_tree_path = cli.save_path.join(meta_merkle_tree_file_name(epoch)); info!( "Submitting epoch {} from {}...", epoch, @@ -272,13 +223,13 @@ async fn main() -> Result<()> { &tip_router_program_id, &tip_distribution_program_id, cli.submit_as_memo, - true, + set_merkle_roots, ) .await?; } Commands::ClaimTips { - tip_distribution_program_id, tip_router_program_id, + tip_distribution_program_id, ncn_address, epoch, } => { @@ -294,6 +245,91 @@ async fn main() -> Result<()> { ) .await?; } + Commands::CreateStakeMeta { + epoch, + slot, + tip_distribution_program_id, + tip_payment_program_id, + save, + } => { + let SnapshotPaths { + ledger_path, + account_paths, + full_snapshots_path: _, + incremental_snapshots_path: _, + backup_snapshots_dir, + } = cli.get_snapshot_paths(); + + // We can safely expect to use the backup_snapshots_dir as the full snapshot path because + // _get_bank_from_snapshot_at_slot_ expects the snapshot at the exact `slot` to have + // already been taken. + let bank = get_bank_from_snapshot_at_slot( + slot, + &backup_snapshots_dir, + &backup_snapshots_dir, + account_paths, + ledger_path.as_path(), + )?; + + create_stake_meta( + cli.operator_address, + epoch, + &Arc::new(bank), + &tip_distribution_program_id, + &tip_payment_program_id, + &cli.save_path, + save, + ); + } + Commands::CreateMerkleTreeCollection { + tip_router_program_id, + ncn_address, + epoch, + save, + } => { + // Load the stake_meta_collection from disk + let stake_meta_collection = match StakeMetaCollection::new_from_file( + &cli.save_path.join(stake_meta_file_name(epoch)), + ) { + Ok(stake_meta_collection) => stake_meta_collection, + Err(e) => panic!("{}", e), + }; + let config = get_ncn_config(&rpc_client, &tip_router_program_id, &ncn_address).await?; + // Tip Router looks backwards in time (typically current_epoch - 1) to calculated + // distributions. Meanwhile the NCN's Ballot is for the current_epoch. So we + // use epoch + 1 here + let ballot_epoch = epoch.checked_add(1).unwrap(); + let protocol_fee_bps = config.fee_config.adjusted_total_fees_bps(ballot_epoch)?; + + // Generate the merkle tree collection + create_merkle_tree_collection( + cli.operator_address, + &tip_router_program_id, + stake_meta_collection, + epoch, + &ncn_address, + protocol_fee_bps, + &cli.save_path, + save, + ); + } + Commands::CreateMetaMerkleTree { epoch, save } => { + // Load the stake_meta_collection from disk + let merkle_tree_collection = match GeneratedMerkleTreeCollection::new_from_file( + &cli.save_path.join(merkle_tree_collection_file_name(epoch)), + ) { + Ok(merkle_tree_collection) => merkle_tree_collection, + Err(e) => panic!("{}", e), + }; + + create_meta_merkle_tree( + cli.operator_address, + merkle_tree_collection, + epoch, + &cli.save_path, + save, + ); + } } Ok(()) } diff --git a/tip-router-operator-cli/src/process_epoch.rs b/tip-router-operator-cli/src/process_epoch.rs index fa24d717..3dcac158 100644 --- a/tip-router-operator-cli/src/process_epoch.rs +++ b/tip-router-operator-cli/src/process_epoch.rs @@ -1,44 +1,57 @@ use std::{ path::PathBuf, str::FromStr, + sync::Arc, time::{Duration, Instant}, }; use anyhow::Result; use ellipsis_client::EllipsisClient; use log::{error, info}; -use solana_metrics::{datapoint_error, datapoint_info}; +use meta_merkle_tree::generated_merkle_tree::{GeneratedMerkleTreeCollection, StakeMetaCollection}; +use solana_metrics::datapoint_error; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::pubkey::Pubkey; +use solana_runtime::bank::Bank; +use solana_sdk::{epoch_info::EpochInfo, pubkey::Pubkey, signature::read_keypair_file}; use tokio::time; use crate::{ - backup_snapshots::SnapshotInfo, get_meta_merkle_root, tip_router::get_ncn_config, Cli, + backup_snapshots::SnapshotInfo, cli::SnapshotPaths, create_merkle_tree_collection, + create_meta_merkle_tree, create_stake_meta, ledger_utils::get_bank_from_snapshot_at_slot, + load_bank_from_snapshot, merkle_tree_collection_file_name, meta_merkle_tree_file_name, + stake_meta_file_name, submit::submit_to_ncn, tip_router::get_ncn_config, Cli, OperatorState, }; const MAX_WAIT_FOR_INCREMENTAL_SNAPSHOT_TICKS: u64 = 1200; // Experimentally determined const OPTIMAL_INCREMENTAL_SNAPSHOT_SLOT_RANGE: u64 = 800; // Experimentally determined -pub async fn wait_for_next_epoch(rpc_client: &RpcClient, current_epoch: u64) -> u64 { +pub async fn wait_for_next_epoch(rpc_client: &RpcClient, current_epoch: u64) -> EpochInfo { loop { tokio::time::sleep(Duration::from_secs(10)).await; // Check every 10 seconds - let new_epoch = match rpc_client.get_epoch_info().await { - Ok(info) => info.epoch, + let new_epoch_info = match rpc_client.get_epoch_info().await { + Ok(info) => info, Err(e) => { error!("Error getting epoch info: {:?}", e); continue; } }; - if new_epoch > current_epoch { - info!("New epoch detected: {} -> {}", current_epoch, new_epoch); - return new_epoch; + if new_epoch_info.epoch > current_epoch { + info!( + "New epoch detected: {} -> {}", + current_epoch, new_epoch_info.epoch + ); + return new_epoch_info; } } } pub async fn get_previous_epoch_last_slot(rpc_client: &RpcClient) -> Result<(u64, u64)> { let epoch_info = rpc_client.get_epoch_info().await?; + calc_prev_epoch_and_final_slot(&epoch_info) +} + +pub fn calc_prev_epoch_and_final_slot(epoch_info: &EpochInfo) -> Result<(u64, u64)> { let current_slot = epoch_info.absolute_slot; let slot_index = epoch_info.slot_index; @@ -86,144 +99,194 @@ pub async fn wait_for_optimal_incremental_snapshot( } #[allow(clippy::too_many_arguments)] -pub async fn process_epoch( - client: &EllipsisClient, - target_slot: u64, - target_epoch: u64, +pub async fn loop_stages( + rpc_client: EllipsisClient, + cli: Cli, + starting_stage: OperatorState, + override_target_slot: Option, + tip_router_program_id: &Pubkey, tip_distribution_program_id: &Pubkey, tip_payment_program_id: &Pubkey, - tip_router_program_id: &Pubkey, ncn_address: &Pubkey, - snapshots_enabled: bool, - new_epoch_rollover: bool, - cli_args: &Cli, + enable_snapshots: bool, + save_stages: bool, ) -> Result<()> { - info!("Processing epoch {:?}", target_epoch); - - let start = Instant::now(); - - let ledger_path = cli_args.ledger_path.clone(); - let account_paths = None; - let full_snapshots_path = cli_args.full_snapshots_path.clone(); - let incremental_snapshots_path = cli_args.backup_snapshots_dir.clone(); - let operator_address = Pubkey::from_str(&cli_args.operator_address).unwrap(); - let meta_merkle_tree_dir = cli_args.meta_merkle_tree_dir.clone(); - - // Get the protocol fees - let ncn_config = get_ncn_config(client, tip_router_program_id, ncn_address).await?; - let tip_router_target_epoch = target_epoch - .checked_add(1) - .ok_or_else(|| anyhow::anyhow!("tip_router_target_epoch overflow"))?; - let adjusted_total_fees = ncn_config - .fee_config - .adjusted_total_fees_bps(tip_router_target_epoch)?; - - let account_paths = account_paths.map_or_else(|| vec![ledger_path.clone()], |paths| paths); - let full_snapshots_path = full_snapshots_path.map_or(ledger_path, |path| path); - - // Wait for optimal incremental snapshot to be available since they can be delayed in a new epoch - if new_epoch_rollover { - wait_for_optimal_incremental_snapshot(incremental_snapshots_path.clone(), target_slot) - .await?; - } + let keypair = read_keypair_file(&cli.keypair_path).expect("Failed to read keypair file"); + let mut current_epoch_info = rpc_client.get_epoch_info().await?; - // Generate merkle root from ledger - let meta_merkle_tree = match get_meta_merkle_root( - cli_args.ledger_path.as_path(), - account_paths, - full_snapshots_path, - incremental_snapshots_path, - &target_slot, - tip_distribution_program_id, - "", // TODO out_path is not used, unsure what should be put here. Maybe `snapshot_output_dir` from cli args? - tip_payment_program_id, - tip_router_program_id, - ncn_address, - &operator_address, - target_epoch, - adjusted_total_fees, - snapshots_enabled, - &meta_merkle_tree_dir, - ) { - Ok(tree) => { - datapoint_info!( - "tip_router_cli.process_epoch", - ("operator_address", operator_address.to_string(), String), - ("epoch", target_epoch, i64), - ("status", "success", String), - ("state", "merkle_root_generation", String), - ("duration_ms", start.elapsed().as_millis() as i64, i64) - ); - tree - } - Err(e) => { - datapoint_error!( - "tip_router_cli.process_epoch", - ("operator_address", operator_address.to_string(), String), - ("epoch", target_epoch, i64), - ("status", "error", String), - ("error", format!("{:?}", e), String), - ("state", "merkle_root_generation", String), - ("duration_ms", start.elapsed().as_millis() as i64, i64) - ); - return Err(anyhow::anyhow!("Failed to generate merkle root: {:?}", e)); - } + // Track runs that are starting right at the beginning of a new epoch + let operator_address = cli.operator_address.clone(); + let mut stage = starting_stage; + let mut bank: Option> = None; + let mut stake_meta_collection: Option = None; + let mut merkle_tree_collection: Option = None; + let mut epoch_to_process = current_epoch_info.epoch.saturating_sub(1); + let mut slot_to_process = if let Some(slot) = override_target_slot { + slot + } else { + let (_, prev_slot) = calc_prev_epoch_and_final_slot(¤t_epoch_info)?; + prev_slot }; + loop { + match stage { + OperatorState::LoadBankFromSnapshot => { + let incremental_snapshots_path = cli.backup_snapshots_dir.clone(); + wait_for_optimal_incremental_snapshot(incremental_snapshots_path, slot_to_process) + .await?; - // Write meta merkle tree to file - let meta_merkle_tree_path = - meta_merkle_tree_dir.join(format!("meta_merkle_tree_{}.json", target_epoch)); - let meta_merkle_tree_json = match serde_json::to_string(&meta_merkle_tree) { - Ok(json) => json, - Err(e) => { - datapoint_error!( - "tip_router_cli.process_epoch", - ("operator_address", operator_address.to_string(), String), - ("epoch", target_epoch, i64), - ("status", "error", String), - ("error", format!("{:?}", e), String), - ("state", "merkle_root_serialization", String), - ("duration_ms", start.elapsed().as_millis() as i64, i64) - ); - return Err(anyhow::anyhow!( - "Failed to serialize meta merkle tree: {}", - e - )); - } - }; + bank = Some(load_bank_from_snapshot( + cli.clone(), + slot_to_process, + enable_snapshots, + )); + // Transition to the next stage + stage = OperatorState::CreateStakeMeta; + } + OperatorState::CreateStakeMeta => { + let start = Instant::now(); + if bank.is_none() { + let SnapshotPaths { + ledger_path, + account_paths, + full_snapshots_path: _, + incremental_snapshots_path: _, + backup_snapshots_dir, + } = cli.get_snapshot_paths(); - if let Err(e) = std::fs::write(meta_merkle_tree_path, meta_merkle_tree_json) { - datapoint_error!( - "tip_router_cli.process_epoch", - ("operator_address", operator_address.to_string(), String), - ("epoch", target_epoch, i64), - ("status", "error", String), - ("error", format!("{:?}", e), String), - ("state", "merkle_root_file_write", String), - ("duration_ms", start.elapsed().as_millis() as i64, i64) - ); - return Err(anyhow::anyhow!( - "Failed to write meta merkle tree to file: {}", - e - )); - } + // We can safely expect to use the backup_snapshots_dir as the full snapshot path because + // _get_bank_from_snapshot_at_slot_ expects the snapshot at the exact `slot` to have + // already been taken. + let maybe_bank = get_bank_from_snapshot_at_slot( + slot_to_process, + &backup_snapshots_dir, + &backup_snapshots_dir, + account_paths, + ledger_path.as_path(), + ); + match maybe_bank { + Ok(some_bank) => bank = Some(Arc::new(some_bank)), + Err(e) => { + datapoint_error!( + "tip_router_cli.create_stake_meta", + ("operator_address", operator_address, String), + ("epoch", epoch_to_process, i64), + ("status", "error", String), + ("error", e.to_string(), String), + ("state", "create_stake_meta", String), + ("duration_ms", start.elapsed().as_millis() as i64, i64) + ); + panic!("{}", e.to_string()); + } + } + } + stake_meta_collection = Some(create_stake_meta( + operator_address.clone(), + epoch_to_process, + bank.as_ref().expect("Bank was not set"), + tip_distribution_program_id, + tip_payment_program_id, + &cli.save_path, + save_stages, + )); + // we should be able to safely drop the bank in this loop + bank = None; + // Transition to the next stage + stage = OperatorState::CreateMerkleTreeCollection; + } + OperatorState::CreateMerkleTreeCollection => { + let some_stake_meta_collection = match stake_meta_collection.to_owned() { + Some(collection) => collection, + None => { + let file = cli.save_path.join(stake_meta_file_name(epoch_to_process)); + StakeMetaCollection::new_from_file(&file)? + } + }; + let config = + get_ncn_config(&rpc_client, tip_router_program_id, ncn_address).await?; + // Tip Router looks backwards in time (typically current_epoch - 1) to calculated + // distributions. Meanwhile the NCN's Ballot is for the current_epoch. So we + // use epoch + 1 here + let ballot_epoch = epoch_to_process.checked_add(1).unwrap(); + let protocol_fee_bps = config.fee_config.adjusted_total_fees_bps(ballot_epoch)?; - // Emit a datapoint for starting the epoch processing - datapoint_info!( - "tip_router_cli.process_epoch", - ("operator_address", operator_address.to_string(), String), - ("epoch", target_epoch, i64), - ("status", "success", String), - ("state", "epoch_processing_completed", String), - ( - "meta_merkle_root", - format!("{:?}", meta_merkle_tree.merkle_root), - String - ), - ("duration_ms", start.elapsed().as_millis() as i64, i64) - ); - - solana_metrics::flush(); + // Generate the merkle tree collection + merkle_tree_collection = Some(create_merkle_tree_collection( + cli.operator_address.clone(), + tip_router_program_id, + some_stake_meta_collection, + epoch_to_process, + ncn_address, + protocol_fee_bps, + &cli.save_path, + save_stages, + )); - Ok(()) + stake_meta_collection = None; + // Transition to the next stage + stage = OperatorState::CreateMetaMerkleTree; + } + OperatorState::CreateMetaMerkleTree => { + let some_merkle_tree_collection = match merkle_tree_collection.to_owned() { + Some(collection) => collection, + None => { + let file = cli + .save_path + .join(merkle_tree_collection_file_name(epoch_to_process)); + GeneratedMerkleTreeCollection::new_from_file(&file)? + } + }; + + create_meta_merkle_tree( + cli.operator_address.clone(), + some_merkle_tree_collection, + epoch_to_process, + &cli.save_path, + // This is defaulted to true because the output file is required by the + // task that sets TipDistributionAccounts' merkle roots + true, + ); + stage = OperatorState::CastVote; + } + OperatorState::CastVote => { + let meta_merkle_tree_path = PathBuf::from(format!( + "{}/{}", + cli.save_path.display(), + meta_merkle_tree_file_name(epoch_to_process) + )); + let operator_address = Pubkey::from_str(&cli.operator_address)?; + submit_to_ncn( + &rpc_client, + &keypair, + &operator_address, + &meta_merkle_tree_path, + epoch_to_process, + ncn_address, + tip_router_program_id, + tip_distribution_program_id, + cli.submit_as_memo, + // We let the submit task handle setting merkle roots + false, + ) + .await?; + stage = OperatorState::WaitForNextEpoch; + } + OperatorState::WaitForNextEpoch => { + current_epoch_info = + wait_for_next_epoch(&rpc_client, current_epoch_info.epoch).await; + // Get the last slot of the previous epoch + let (previous_epoch, previous_epoch_slot) = + if let Ok((epoch, slot)) = get_previous_epoch_last_slot(&rpc_client).await { + (epoch, slot) + } else { + // TODO: Make a datapoint error + error!("Error getting previous epoch slot"); + continue; + }; + slot_to_process = previous_epoch_slot; + epoch_to_process = previous_epoch; + + stage = OperatorState::LoadBankFromSnapshot; + } + } + } } diff --git a/tip-router-operator-cli/src/stake_meta_generator.rs b/tip-router-operator-cli/src/stake_meta_generator.rs index e2324da0..46e8bcd7 100644 --- a/tip-router-operator-cli/src/stake_meta_generator.rs +++ b/tip-router-operator-cli/src/stake_meta_generator.rs @@ -2,7 +2,6 @@ use std::{ collections::HashMap, fmt::{Debug, Display, Formatter}, mem::size_of, - path::{Path, PathBuf}, sync::Arc, }; @@ -20,20 +19,16 @@ use solana_ledger::{ bank_forks_utils::BankForksUtilsError, blockstore::BlockstoreError, blockstore_processor::BlockstoreProcessorError, }; -use solana_metrics::datapoint_error; use solana_program::{stake_history::StakeHistory, sysvar}; use solana_runtime::{bank::Bank, stakes::StakeAccount}; use solana_sdk::{ account::{from_account, ReadableAccount, WritableAccount}, - clock::Slot, pubkey::Pubkey, }; use solana_vote::vote_account::VoteAccount; use thiserror::Error; -use crate::{ - derive_tip_payment_pubkeys, ledger_utils::get_bank_from_ledger, TipDistributionAccountWrapper, -}; +use crate::{derive_tip_payment_pubkeys, TipDistributionAccountWrapper}; #[derive(Error, Debug)] pub enum StakeMetaGeneratorError { @@ -72,70 +67,6 @@ impl Display for StakeMetaGeneratorError { } } -/// Creates a bank from the paths at the desired slot and generates the StakeMetaCollection for -/// that slot. Optionally writing the result as JSON file to disk. -#[allow(clippy::too_many_arguments)] -pub fn generate_stake_meta( - operator_address: &Pubkey, - ledger_path: &Path, - account_paths: Vec, - full_snapshots_path: PathBuf, - incremental_snapshots_path: PathBuf, - desired_slot: &Slot, - tip_distribution_program_id: &Pubkey, - _out_path: &str, - tip_payment_program_id: &Pubkey, - snapshots_enabled: bool, -) -> Result { - info!("Creating bank from ledger path..."); - let res = std::panic::catch_unwind(|| { - get_bank_from_ledger( - operator_address, - ledger_path, - account_paths, - full_snapshots_path, - incremental_snapshots_path, - desired_slot, - snapshots_enabled, - ) - }); - let bank = match res { - Ok(bank) => bank, - Err(e) => { - #[allow(clippy::option_if_let_else)] - let error_str = if let Some(s) = e.downcast_ref::() { - s.to_string() - } else if let Some(s) = e.downcast_ref::<&'static str>() { - s.to_string() - } else { - // If we can't get a string, try to get any Debug implementation - match e.downcast_ref::>() { - Some(debug_val) => format!("{:?}", debug_val), - None => "Unknown panic payload".to_string(), - } - }; - error!("Panicked while creating bank from ledger: {}", error_str); - datapoint_error!( - "tip_router_cli.get_bank", - ("operator", operator_address.to_string(), String), - ("status", "error", String), - ("state", "get_bank_from_ledger", String), - ("error", error_str, String), - ); - return Err(StakeMetaGeneratorError::PanicError(error_str)); - } - }; - - info!("Generating stake_meta_collection object..."); - let stake_meta_coll = - generate_stake_meta_collection(&bank, tip_distribution_program_id, tip_payment_program_id)?; - - // Explicity drop to clean up disk space - drop(bank); - - Ok(stake_meta_coll) -} - fn tip_distribution_account_from_tda_wrapper( tda_wrapper: TipDistributionAccountWrapper, // The amount that will be left remaining in the tda to maintain rent exemption status. diff --git a/tip-router-operator-cli/src/submit.rs b/tip-router-operator-cli/src/submit.rs index 88f4e357..f8670f78 100644 --- a/tip-router-operator-cli/src/submit.rs +++ b/tip-router-operator-cli/src/submit.rs @@ -18,6 +18,7 @@ use solana_client::{ use solana_metrics::{datapoint_error, datapoint_info}; use solana_sdk::{pubkey::Pubkey, signature::Keypair}; +use crate::meta_merkle_tree_file_name; use crate::{ tip_router::{cast_vote, get_ncn_config, set_merkle_roots_batched}, Cli, @@ -40,8 +41,8 @@ pub async fn submit_recent_epochs_to_ncn( for i in 0..num_monitored_epochs { let process_epoch = epoch.epoch.checked_sub(i).unwrap(); - let meta_merkle_tree_dir = cli_args.meta_merkle_tree_dir.clone(); - let target_meta_merkle_tree_file = format!("meta_merkle_tree_{}.json", process_epoch); + let meta_merkle_tree_dir = cli_args.save_path.clone(); + let target_meta_merkle_tree_file = meta_merkle_tree_file_name(process_epoch); let target_meta_merkle_tree_path = meta_merkle_tree_dir.join(target_meta_merkle_tree_file); if !target_meta_merkle_tree_path.exists() { continue; @@ -136,6 +137,14 @@ pub async fn submit_to_ncn( None => true, }; + info!( + "Determining if operator needs to vote...\n\ + should_cast_vote: {}\n\ + is_voting_valid: {} + ", + should_cast_vote, is_voting_valid + ); + if should_cast_vote && is_voting_valid { let res = cast_vote( client, diff --git a/tip-router-operator-cli/tests/integration_tests.rs b/tip-router-operator-cli/tests/integration_tests.rs index 58f1eb3b..bee692e8 100644 --- a/tip-router-operator-cli/tests/integration_tests.rs +++ b/tip-router-operator-cli/tests/integration_tests.rs @@ -1,7 +1,4 @@ -use std::{ - fs, - path::{Path, PathBuf}, -}; +use std::{fs, path::PathBuf}; use anchor_lang::prelude::AnchorSerialize; use jito_tip_distribution_sdk::jito_tip_distribution::ID as TIP_DISTRIBUTION_ID; @@ -21,7 +18,7 @@ use solana_sdk::{ transaction::Transaction, }; use tempfile::TempDir; -use tip_router_operator_cli::{get_meta_merkle_root, TipAccountConfig}; +use tip_router_operator_cli::TipAccountConfig; #[allow(dead_code)] struct TestContext { @@ -183,76 +180,6 @@ impl TestContext { } } -#[tokio::test] -async fn test_meta_merkle_creation_from_ledger() { - // 1. Setup - create necessary variables/arguments - let ledger_path = Path::new("tests/fixtures/test-ledger"); - let account_paths = vec![ledger_path.join("accounts/run")]; - let full_snapshots_path = PathBuf::from("tests/fixtures/test-ledger"); - let desired_slot = &144; - let tip_distribution_program_id = &TIP_DISTRIBUTION_ID; - let out_path = "tests/fixtures/output.json"; - let tip_payment_program_id = &TIP_PAYMENT_ID; - let ncn_address = Pubkey::new_unique(); - let operator_address = Pubkey::new_unique(); - let epoch = 0u64; - const PROTOCOL_FEE_BPS: u64 = 300; - - // 2. Call the function - let meta_merkle_tree = get_meta_merkle_root( - ledger_path, - account_paths, - full_snapshots_path.clone(), - full_snapshots_path, - desired_slot, - tip_distribution_program_id, - out_path, - tip_payment_program_id, - &jito_tip_router_program::id(), - &ncn_address, - &operator_address, - epoch, - PROTOCOL_FEE_BPS, - false, - &ledger_path.to_path_buf(), - ) - .unwrap(); - - // 3. More comprehensive validations - assert_ne!( - meta_merkle_tree.merkle_root, [0; 32], - "Merkle root should not be zero" - ); - - // Verify structure - assert!( - meta_merkle_tree.num_nodes > 0, - "Should have validator nodes" - ); - - // Verify each node - for node in &meta_merkle_tree.tree_nodes { - // Verify node has required fields - assert_ne!( - node.tip_distribution_account, - Pubkey::default(), - "Node should have valid tip distribution account" - ); - assert!( - node.max_total_claim > 0, - "Node should have positive max claim" - ); - assert!( - node.max_num_nodes > 0, - "Node should have positive max nodes" - ); - assert!(node.proof.is_some(), "Node should have a proof"); - } - - // Verify the proofs are valid - meta_merkle_tree.verify_proof().unwrap(); -} - #[tokio::test] async fn test_merkle_tree_generation() -> Result<(), Box> { // Constants