From 9495851ff9be5fe7928191dc8e4511b052e39b44 Mon Sep 17 00:00:00 2001 From: Manolis Liolios Date: Wed, 14 Feb 2024 14:47:13 +0200 Subject: [PATCH] [SuiNS Indexer] Dynamic config & edge case fix (#16223) ## Description This PR: 1. Allows dynamic config for registry + subdomain wrapper ID from env file (so we can deploy in diff networks) 2. Gets rid of any S3 related envs, we can just use the public endpoint. 3. Rewrites the logic to use a single loop over all transactions in a sequenced way (so we make sure that in the end of each checkpoint, we only have the LAST state for updates/deletions per name record). ## Test Plan Re-run the existing tests ``` crates/suins-indexer$ cargo test ``` --- If your changes are not user-facing and do not break anything, you can skip the following section. Otherwise, please briefly describe what has changed under the Release Notes section. ### Type of Change (Check all that apply) - [ ] protocol change - [ ] user-visible impact - [ ] breaking change for a client SDKs - [ ] breaking change for FNs (FN binary must upgrade) - [ ] breaking change for validators or node operators (must upgrade binaries) - [ ] breaking change for on-chain data layout - [ ] necessitate either a data wipe or data migration ### Release notes --- crates/suins-indexer/.env.sample | 12 +- crates/suins-indexer/src/indexer.rs | 255 +++++++++--------- crates/suins-indexer/src/main.rs | 60 +++-- .../tests/checkpoint_process_tests.rs | 19 +- 4 files changed, 185 insertions(+), 161 deletions(-) diff --git a/crates/suins-indexer/.env.sample b/crates/suins-indexer/.env.sample index b32cafb03b37e..c24ac51dd410b 100644 --- a/crates/suins-indexer/.env.sample +++ b/crates/suins-indexer/.env.sample @@ -1,7 +1,7 @@ DATABASE_URL=postgres://:@localhost:5432/ -AWS_S3_ENDPOINT=https://s3..amazonaws.com/ -AWS_ACCESS_KEY_ID= -AWS_ACCESS_SECRET_KEY= -AWS_SESSION_TOKEN= -BACKFILL_PROGRESS_FILE_PATH=/tmp/backfill_progress # expects a file in the format { "indexing_name": } -CHECKPOINTS_DIR=/tmp/backfill_progress +REMOTE_STORAGE=https://checkpoints.mainnet.sui.io +BACKFILL_PROGRESS_FILE_PATH=/tmp/backfill_progress # expects a file in the format { "suins_indexing": } +CHECKPOINTS_DIR=/tmp/checkpoints +REGISTRY_ID=0xe64cd9db9f829c6cc405d9790bd71567ae07259855f4fba6f02c84f52298c106 +SUBDOMAIN_WRAPPER_TYPE=0xe64cd9db9f829c6cc405d9790bd71567ae07259855f4fba6f02c84f52298c106::subdomain_registration::SubDomainRegistration + diff --git a/crates/suins-indexer/src/indexer.rs b/crates/suins-indexer/src/indexer.rs index c272e87eb672b..f7fb1ce489851 100644 --- a/crates/suins-indexer/src/indexer.rs +++ b/crates/suins-indexer/src/indexer.rs @@ -9,28 +9,23 @@ use std::{ use move_core_types::language_storage::StructTag; use sui_json_rpc::name_service::{Domain, NameRecord, SubDomainRegistration}; use sui_types::{ - base_types::{ObjectID, SequenceNumber, SuiAddress}, + base_types::{ObjectID, SuiAddress}, dynamic_field::Field, - full_checkpoint_content::CheckpointData, + full_checkpoint_content::{CheckpointData, CheckpointTransaction}, object::Object, }; use crate::models::VerifiedDomain; -/// TODO(manos): Hardcode mainnet addresses. const REGISTRY_TABLE_ID: &str = "0xe64cd9db9f829c6cc405d9790bd71567ae07259855f4fba6f02c84f52298c106"; +/// TODO(manos): Hardcode mainnet once we publish the subdomains package. const SUBDOMAIN_REGISTRATION_TYPE: &str = "0xPackageIdTBD::subdomain_registration::SubDomainRegistration"; #[derive(Debug, Clone)] -pub struct NameRecordChange { - /// the NameRecord entry in the table (DF). - field: Field, - /// the DF's ID. - field_id: ObjectID, - sequence_number: SequenceNumber, -} +pub struct NameRecordChange(Field); + pub struct SuinsIndexer { registry_table_id: SuiAddress, subdomain_wrapper_type: StructTag, @@ -77,108 +72,160 @@ impl SuinsIndexer { .is_some_and(|owner| owner == self.registry_table_id) } + /// Processes a checkpoint and produces a list of `updates` and a list of `removals` + /// + /// We can then use these to execute our DB bulk insertions + bulk deletions. + /// + /// Returns + /// - `Vec`: A list of NameRecord updates for the database (including sequence number) + /// - `Vec`: A list of IDs to be deleted from the database (`field_id` is the matching column) + pub fn process_checkpoint(&self, data: &CheckpointData) -> (Vec, Vec) { + let mut checkpoint = SuinsIndexerCheckpoint::new(data.checkpoint_summary.sequence_number); + + // loop through all the transactions in the checkpoint + // Since the transactions are sequenced inside the checkpoint, we can safely assume + // that we have the latest data for each name record in the end of the loop. + for transaction in &data.transactions { + // Add all name record changes to the name_records HashMap. + // Remove any removals that got re-created. + checkpoint.parse_record_changes(self, &transaction.output_objects); + + // Gather all removals from the transaction, + // and delete any name records from the name_records if it got deleted. + checkpoint.parse_record_deletions(self, transaction); + } + + ( + // Convert our name_records & wrappers into a list of updates for the DB. + checkpoint.prepare_db_updates(), + checkpoint + .removals + .into_iter() + .map(|id| id.to_string()) + .collect(), + ) + } +} + +pub struct SuinsIndexerCheckpoint { + /// A list of name records that have been updated in the checkpoint. + name_records: HashMap, + /// A list of subdomain wrappers that have been created in the checkpoint. + subdomain_wrappers: HashMap, + /// A list of name records that have been deleted in the checkpoint. + removals: HashSet, + /// The sequence number of the checkpoint. + checkpoint_sequence_number: u64, +} + +impl SuinsIndexerCheckpoint { + pub fn new(checkpoint_sequence_number: u64) -> Self { + Self { + name_records: HashMap::new(), + subdomain_wrappers: HashMap::new(), + removals: HashSet::new(), + checkpoint_sequence_number, + } + } + /// Parses the name record changes + subdomain wraps. /// and pushes them into the supplied vector + hashmap. /// /// It is implemented in a way to do just a single iteration over the objects. - pub fn parse_name_record_changes( - &self, - objects: &[&Object], - ) -> (HashMap, HashMap) { - let mut name_records: HashMap = HashMap::new(); - let mut subdomain_wrappers: HashMap = HashMap::new(); - - for &object in objects { + pub fn parse_record_changes(&mut self, config: &SuinsIndexer, objects: &[Object]) { + for object in objects { // Parse all the changes to a `NameRecord` - if self.is_name_record(object) { + if config.is_name_record(object) { let name_record: Field = object .to_rust() .unwrap_or_else(|| panic!("Failed to parse name record for {:?}", object)); - let id: ObjectID = object.id(); - - // If we already have a newer version of the same name record, skip insertion. - // That prevents us from falling into PG's bulk insertions double conflicts. - if name_records - .get(&id) - .is_some_and(|x| x.sequence_number > object.version()) - { - continue; - } - - name_records.insert( - id, - NameRecordChange { - field: name_record, - field_id: object.id(), - sequence_number: object.version(), - }, - ); + let id = object.id(); + + // Remove from the removals list if it's there. + // The reason it might have been there is that the same name record might have been + // deleted in a previous transaction in the same checkpoint, and now it got re-created. + self.removals.remove(&id); + + self.name_records.insert(id, NameRecordChange(name_record)); } // Parse subdomain wrappers and save them in our hashmap. // Later, we'll save the id of the wrapper in the name record. // NameRecords & their equivalent SubdomainWrappers are always created in the same PTB, so we can safely assume // that the wrapper will be created on the same checkpoint as the name record and vice versa. - if self.is_subdomain_wrapper(object) { + if config.is_subdomain_wrapper(object) { let sub_domain: SubDomainRegistration = object.to_rust().unwrap(); - subdomain_wrappers.insert( + self.subdomain_wrappers.insert( sub_domain.nft.domain_name, sub_domain.id.id.bytes.to_string(), ); }; } - - (name_records, subdomain_wrappers) } - /// For each input object, we're parsing the name record deletions - /// A deletion we want to track is a deleted object which is of `NameRecord` type. - /// Domain replacements do not count as deletions, but instead are an update to the latest state. - pub fn parse_name_record_deletions(&self, checkpoint: &CheckpointData) -> Vec { - let mut removals: Vec = vec![]; - - // Gather all object ids that got deleted. - // This way, we can delete removed name records - // (detects burning of expired names or leaf names removal). - let deleted_objects: HashSet<_> = checkpoint - .transactions - .iter() - .flat_map(|x| x.effects.all_removed_objects()) - .map(|((id, _, _), _)| id) + /// Parses a list of the deletions in the checkpoint and adds them to the removals list. + /// Also removes any name records from the updates, if they ended up being deleted in the same checkpoint. + pub fn parse_record_deletions( + &mut self, + config: &SuinsIndexer, + transaction: &CheckpointTransaction, + ) { + // a list of all the deleted objects in the transaction. + let deleted_objects: HashSet<_> = transaction + .effects + .all_tombstones() + .into_iter() + .map(|(id, _)| id) .collect(); - for input in checkpoint.input_objects() { - if self.is_name_record(input) && deleted_objects.contains(&input.id()) { - removals.push(input.id().to_string()); + for input in transaction.input_objects.iter() { + if config.is_name_record(input) && deleted_objects.contains(&input.id()) { + // since this record was deleted, we need to remove it from the name_records hashmap. + // that catches a case where a name record was edited on a previous transaction in the checkpoint + // and deleted from a different tx later in the checkpoint. + self.name_records.remove(&input.id()); + + // add it in the list of removals + self.removals.insert(input.id()); } } - - removals } - /// Processes a checkpoint and produces a list of `updates` and a list of `removals` - /// - /// We can then use these to execute our DB bulk insertions + bulk deletions. - /// - /// Returns - /// - `Vec`: A list of NameRecord updates for the database (including sequence number) - /// - `Vec`: A list of IDs to be deleted from the database (`field_id` is the matching column) - pub fn process_checkpoint( - &self, - checkpoint: CheckpointData, - ) -> (Vec, Vec) { - let (name_records, subdomain_wrappers) = - self.parse_name_record_changes(&checkpoint.output_objects()); - - let removals = self.parse_name_record_deletions(&checkpoint); - - let updates = prepare_db_updates( - &name_records, - &subdomain_wrappers, - checkpoint.checkpoint_summary.sequence_number, - ); - - (updates, removals) + /// Prepares a vector of `VerifiedDomain`s to be inserted into the DB, taking in account + /// the list of subdomain wrappers created as well as the checkpoint's sequence number. + pub fn prepare_db_updates(&self) -> Vec { + let mut updates: Vec = vec![]; + + for (field_id, name_record_change) in self.name_records.iter() { + let name_record = &name_record_change.0; + + let parent = name_record.name.parent().to_string(); + let nft_id = name_record.value.nft_id.bytes.to_string(); + + updates.push(VerifiedDomain { + field_id: field_id.to_string(), + name: name_record.name.to_string(), + parent, + expiration_timestamp_ms: name_record.value.expiration_timestamp_ms as i64, + nft_id, + target_address: if name_record.value.target_address.is_some() { + Some(SuiAddress::to_string( + &name_record.value.target_address.unwrap(), + )) + } else { + None + }, + // unwrapping must be safe as `value.data` is an on-chain value with VecMap type. + data: serde_json::to_value(&name_record.value.data).unwrap(), + last_checkpoint_updated: self.checkpoint_sequence_number as i64, + subdomain_wrapper_id: self + .subdomain_wrappers + .get(&name_record.name.to_string()) + .cloned(), + }); + } + + updates } } @@ -198,43 +245,3 @@ pub fn format_update_field_query(field: &str) -> String { pub fn format_update_subdomain_wrapper_query() -> String { "CASE WHEN excluded.subdomain_wrapper_id IS NOT NULL THEN excluded.subdomain_wrapper_id ELSE domains.subdomain_wrapper_id END".to_string() } - -/// Prepares a vector of `VerifiedDomain`s to be inserted into the DB, taking in account -/// the list of subdomain wrappers created as well as the checkpoint's sequence number. -pub fn prepare_db_updates( - name_record_changes: &HashMap, - subdomain_wrappers: &HashMap, - checkpoint_seq_num: u64, -) -> Vec { - let mut updates: Vec = vec![]; - - for name_record_change in name_record_changes.values() { - let name_record = &name_record_change.field; - - let parent = name_record.name.parent().to_string(); - let nft_id = name_record.value.nft_id.bytes.to_string(); - - updates.push(VerifiedDomain { - field_id: name_record_change.field_id.to_string(), - name: name_record.name.to_string(), - parent, - expiration_timestamp_ms: name_record.value.expiration_timestamp_ms as i64, - nft_id, - target_address: if name_record.value.target_address.is_some() { - Some(SuiAddress::to_string( - &name_record.value.target_address.unwrap(), - )) - } else { - None - }, - // unwrapping must be safe as `value.data` is an on-chain value with VecMap type. - data: serde_json::to_value(&name_record.value.data).unwrap(), - last_checkpoint_updated: checkpoint_seq_num as i64, - subdomain_wrapper_id: subdomain_wrappers - .get(&name_record.name.to_string()) - .cloned(), - }); - } - - updates -} diff --git a/crates/suins-indexer/src/main.rs b/crates/suins-indexer/src/main.rs index 86c9256c15961..e802ed19137d1 100644 --- a/crates/suins-indexer/src/main.rs +++ b/crates/suins-indexer/src/main.rs @@ -3,13 +3,14 @@ use anyhow::Result; use async_trait::async_trait; -use diesel::{dsl::sql, Connection, ExpressionMethods, RunQueryDsl}; +use diesel::{dsl::sql, BoolExpressionMethods, Connection, ExpressionMethods, RunQueryDsl}; use prometheus::Registry; use std::path::PathBuf; use sui_data_ingestion_core::{ DataIngestionMetrics, FileProgressStore, IndexerExecutor, Worker, WorkerPool, }; use sui_types::full_checkpoint_content::CheckpointData; + use suins_indexer::{ get_connection_pool, indexer::{format_update_field_query, format_update_subdomain_wrapper_query, SuinsIndexer}, @@ -37,7 +38,12 @@ impl SuinsIndexerWorker { /// - The second query is a bulk delete of all deletions. /// /// You can safely call this with empty updates/deletions as it will return Ok. - fn commit_to_db(&self, updates: &[VerifiedDomain], removals: &[String]) -> Result<()> { + fn commit_to_db( + &self, + updates: &[VerifiedDomain], + removals: &[String], + checkpoint_seq_num: u64, + ) -> Result<()> { if updates.is_empty() && removals.is_empty() { return Ok(()); } @@ -73,8 +79,14 @@ impl SuinsIndexerWorker { } if !removals.is_empty() { + // We want to remove from the database all name records that were removed in the checkpoint + // but only if the checkpoint is newer than the last time the name record was updated. diesel::delete(domains::table) - .filter(domains::field_id.eq_any(removals)) + .filter( + domains::field_id + .eq_any(removals) + .and(domains::last_checkpoint_updated.le(checkpoint_seq_num as i64)), + ) .execute(tx) .unwrap_or_else(|_| panic!("Failed to process deletions: {:?}", removals)); } @@ -87,9 +99,10 @@ impl SuinsIndexerWorker { #[async_trait] impl Worker for SuinsIndexerWorker { async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> { - let (updates, removals) = self.indexer.process_checkpoint(checkpoint); + let checkpoint_seq_number = checkpoint.checkpoint_summary.sequence_number; + let (updates, removals) = self.indexer.process_checkpoint(&checkpoint); - self.commit_to_db(&updates, &removals)?; + self.commit_to_db(&updates, &removals, checkpoint_seq_number)?; Ok(()) } } @@ -97,10 +110,11 @@ impl Worker for SuinsIndexerWorker { #[tokio::main] async fn main() -> Result<()> { dotenv().ok(); - let aws_key_id = env::var("AWS_ACCESS_KEY_ID").ok(); - let aws_secret_access_key = env::var("AWS_ACCESS_SECRET_KEY").ok(); - let aws_session_token = env::var("AWS_SESSION_TOKEN").ok(); - let aws_s3_endpoint = env::var("AWS_S3_ENDPOINT").ok(); + let (remote_storage, registry_id, subdomain_wrapper_type) = ( + env::var("REMOTE_STORAGE").ok(), + env::var("REGISTRY_ID").ok(), + env::var("SUBDOMAIN_WRAPPER_TYPE").ok(), + ); let backfill_progress_file_path = env::var("BACKFILL_PROGRESS_FILE_PATH").unwrap_or("/tmp/backfill_progress".to_string()); let checkpoints_dir = env::var("CHECKPOINTS_DIR").unwrap_or("/tmp/checkpoints".to_string()); @@ -112,11 +126,18 @@ async fn main() -> Result<()> { let metrics = DataIngestionMetrics::new(&Registry::new()); let mut executor = IndexerExecutor::new(progress_store, 1, metrics); + let indexer_setup = if let (Some(registry_id), Some(subdomain_wrapper_type)) = + (registry_id, subdomain_wrapper_type) + { + SuinsIndexer::new(registry_id, subdomain_wrapper_type) + } else { + SuinsIndexer::default() + }; + let worker_pool = WorkerPool::new( SuinsIndexerWorker { pg_pool: get_connection_pool(), - // TODO(manos): This should be configurable from env. - indexer: SuinsIndexer::default(), + indexer: indexer_setup, }, "suins_indexing".to_string(), /* task name used as a key in the progress store */ 100, /* concurrency */ @@ -126,21 +147,8 @@ async fn main() -> Result<()> { executor .run( PathBuf::from(checkpoints_dir), /* directory should exist but can be empty */ - aws_s3_endpoint, /* remote_read_endpoint: If set */ - vec![ - ( - "aws_access_key_id".to_string(), - aws_key_id.unwrap_or("".to_string()), - ), - ( - "aws_secret_access_key".to_string(), - aws_secret_access_key.unwrap_or("".to_string()), - ), - ( - "aws_session_token".to_string(), - aws_session_token.unwrap_or("".to_string()), - ), - ], /* aws credentials */ + remote_storage, /* remote_read_endpoint: If set */ + vec![], /* aws credentials */ 100, /* remote_read_batch_size */ exit_receiver, ) diff --git a/crates/suins-indexer/tests/checkpoint_process_tests.rs b/crates/suins-indexer/tests/checkpoint_process_tests.rs index b37d1cb99b65c..34e308394e0f0 100644 --- a/crates/suins-indexer/tests/checkpoint_process_tests.rs +++ b/crates/suins-indexer/tests/checkpoint_process_tests.rs @@ -24,7 +24,7 @@ fn process_22279187_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22279187.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); // This checkpoint has no removals and adds 3 names. assert_eq!(removals.len(), 0); @@ -41,7 +41,7 @@ fn process_22279187_checkpoint() { fn process_22279365_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22279365.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); // This checkpoint has 1 removal and 1 addition. assert_eq!(removals.len(), 1); @@ -62,7 +62,7 @@ fn process_22279365_checkpoint() { fn process_22279496_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22279496.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); assert_eq!(removals.len(), 0); assert_eq!(updates.len(), 1); @@ -85,7 +85,7 @@ fn process_22279496_checkpoint() { fn process_22279944_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22279944.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); assert_eq!(removals.len(), 0); assert_eq!(updates.len(), 1); @@ -100,13 +100,17 @@ fn process_22279944_checkpoint() { addition.subdomain_wrapper_id, Some("0x9ca93181d093598b55787e82f69296819e9f779f25f1cc5226d2cd4d07126790".to_string()) ); + assert_eq!( + addition.field_id, + "0x79b123c73d073ba73c9e6f0817e63270d716db3c7945ecde477b22df7d026e43".to_string() + ) } #[test] fn process_22280030_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22280030.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); assert_eq!(removals.len(), 0); assert_eq!(updates.len(), 1); @@ -121,6 +125,11 @@ fn process_22280030_checkpoint() { addition.subdomain_wrapper_id, Some("0x48de1a7eef5956c4f3478849654abd94dcf5b206c631328c50518091b0eee9b0".to_string()) ); + + assert_eq!( + addition.field_id, + "0x79b123c73d073ba73c9e6f0817e63270d716db3c7945ecde477b22df7d026e43".to_string() + ) } /// Reads a checkpoint from a given file in the `/tests/data` directory.