diff --git a/crates/suins-indexer/.env.sample b/crates/suins-indexer/.env.sample index b32cafb03b37e..c24ac51dd410b 100644 --- a/crates/suins-indexer/.env.sample +++ b/crates/suins-indexer/.env.sample @@ -1,7 +1,7 @@ DATABASE_URL=postgres://:@localhost:5432/ -AWS_S3_ENDPOINT=https://s3..amazonaws.com/ -AWS_ACCESS_KEY_ID= -AWS_ACCESS_SECRET_KEY= -AWS_SESSION_TOKEN= -BACKFILL_PROGRESS_FILE_PATH=/tmp/backfill_progress # expects a file in the format { "indexing_name": } -CHECKPOINTS_DIR=/tmp/backfill_progress +REMOTE_STORAGE=https://checkpoints.mainnet.sui.io +BACKFILL_PROGRESS_FILE_PATH=/tmp/backfill_progress # expects a file in the format { "suins_indexing": } +CHECKPOINTS_DIR=/tmp/checkpoints +REGISTRY_ID=0xe64cd9db9f829c6cc405d9790bd71567ae07259855f4fba6f02c84f52298c106 +SUBDOMAIN_WRAPPER_TYPE=0xe64cd9db9f829c6cc405d9790bd71567ae07259855f4fba6f02c84f52298c106::subdomain_registration::SubDomainRegistration + diff --git a/crates/suins-indexer/src/indexer.rs b/crates/suins-indexer/src/indexer.rs index c272e87eb672b..f7fb1ce489851 100644 --- a/crates/suins-indexer/src/indexer.rs +++ b/crates/suins-indexer/src/indexer.rs @@ -9,28 +9,23 @@ use std::{ use move_core_types::language_storage::StructTag; use sui_json_rpc::name_service::{Domain, NameRecord, SubDomainRegistration}; use sui_types::{ - base_types::{ObjectID, SequenceNumber, SuiAddress}, + base_types::{ObjectID, SuiAddress}, dynamic_field::Field, - full_checkpoint_content::CheckpointData, + full_checkpoint_content::{CheckpointData, CheckpointTransaction}, object::Object, }; use crate::models::VerifiedDomain; -/// TODO(manos): Hardcode mainnet addresses. const REGISTRY_TABLE_ID: &str = "0xe64cd9db9f829c6cc405d9790bd71567ae07259855f4fba6f02c84f52298c106"; +/// TODO(manos): Hardcode mainnet once we publish the subdomains package. const SUBDOMAIN_REGISTRATION_TYPE: &str = "0xPackageIdTBD::subdomain_registration::SubDomainRegistration"; #[derive(Debug, Clone)] -pub struct NameRecordChange { - /// the NameRecord entry in the table (DF). - field: Field, - /// the DF's ID. - field_id: ObjectID, - sequence_number: SequenceNumber, -} +pub struct NameRecordChange(Field); + pub struct SuinsIndexer { registry_table_id: SuiAddress, subdomain_wrapper_type: StructTag, @@ -77,108 +72,160 @@ impl SuinsIndexer { .is_some_and(|owner| owner == self.registry_table_id) } + /// Processes a checkpoint and produces a list of `updates` and a list of `removals` + /// + /// We can then use these to execute our DB bulk insertions + bulk deletions. + /// + /// Returns + /// - `Vec`: A list of NameRecord updates for the database (including sequence number) + /// - `Vec`: A list of IDs to be deleted from the database (`field_id` is the matching column) + pub fn process_checkpoint(&self, data: &CheckpointData) -> (Vec, Vec) { + let mut checkpoint = SuinsIndexerCheckpoint::new(data.checkpoint_summary.sequence_number); + + // loop through all the transactions in the checkpoint + // Since the transactions are sequenced inside the checkpoint, we can safely assume + // that we have the latest data for each name record in the end of the loop. + for transaction in &data.transactions { + // Add all name record changes to the name_records HashMap. + // Remove any removals that got re-created. + checkpoint.parse_record_changes(self, &transaction.output_objects); + + // Gather all removals from the transaction, + // and delete any name records from the name_records if it got deleted. + checkpoint.parse_record_deletions(self, transaction); + } + + ( + // Convert our name_records & wrappers into a list of updates for the DB. + checkpoint.prepare_db_updates(), + checkpoint + .removals + .into_iter() + .map(|id| id.to_string()) + .collect(), + ) + } +} + +pub struct SuinsIndexerCheckpoint { + /// A list of name records that have been updated in the checkpoint. + name_records: HashMap, + /// A list of subdomain wrappers that have been created in the checkpoint. + subdomain_wrappers: HashMap, + /// A list of name records that have been deleted in the checkpoint. + removals: HashSet, + /// The sequence number of the checkpoint. + checkpoint_sequence_number: u64, +} + +impl SuinsIndexerCheckpoint { + pub fn new(checkpoint_sequence_number: u64) -> Self { + Self { + name_records: HashMap::new(), + subdomain_wrappers: HashMap::new(), + removals: HashSet::new(), + checkpoint_sequence_number, + } + } + /// Parses the name record changes + subdomain wraps. /// and pushes them into the supplied vector + hashmap. /// /// It is implemented in a way to do just a single iteration over the objects. - pub fn parse_name_record_changes( - &self, - objects: &[&Object], - ) -> (HashMap, HashMap) { - let mut name_records: HashMap = HashMap::new(); - let mut subdomain_wrappers: HashMap = HashMap::new(); - - for &object in objects { + pub fn parse_record_changes(&mut self, config: &SuinsIndexer, objects: &[Object]) { + for object in objects { // Parse all the changes to a `NameRecord` - if self.is_name_record(object) { + if config.is_name_record(object) { let name_record: Field = object .to_rust() .unwrap_or_else(|| panic!("Failed to parse name record for {:?}", object)); - let id: ObjectID = object.id(); - - // If we already have a newer version of the same name record, skip insertion. - // That prevents us from falling into PG's bulk insertions double conflicts. - if name_records - .get(&id) - .is_some_and(|x| x.sequence_number > object.version()) - { - continue; - } - - name_records.insert( - id, - NameRecordChange { - field: name_record, - field_id: object.id(), - sequence_number: object.version(), - }, - ); + let id = object.id(); + + // Remove from the removals list if it's there. + // The reason it might have been there is that the same name record might have been + // deleted in a previous transaction in the same checkpoint, and now it got re-created. + self.removals.remove(&id); + + self.name_records.insert(id, NameRecordChange(name_record)); } // Parse subdomain wrappers and save them in our hashmap. // Later, we'll save the id of the wrapper in the name record. // NameRecords & their equivalent SubdomainWrappers are always created in the same PTB, so we can safely assume // that the wrapper will be created on the same checkpoint as the name record and vice versa. - if self.is_subdomain_wrapper(object) { + if config.is_subdomain_wrapper(object) { let sub_domain: SubDomainRegistration = object.to_rust().unwrap(); - subdomain_wrappers.insert( + self.subdomain_wrappers.insert( sub_domain.nft.domain_name, sub_domain.id.id.bytes.to_string(), ); }; } - - (name_records, subdomain_wrappers) } - /// For each input object, we're parsing the name record deletions - /// A deletion we want to track is a deleted object which is of `NameRecord` type. - /// Domain replacements do not count as deletions, but instead are an update to the latest state. - pub fn parse_name_record_deletions(&self, checkpoint: &CheckpointData) -> Vec { - let mut removals: Vec = vec![]; - - // Gather all object ids that got deleted. - // This way, we can delete removed name records - // (detects burning of expired names or leaf names removal). - let deleted_objects: HashSet<_> = checkpoint - .transactions - .iter() - .flat_map(|x| x.effects.all_removed_objects()) - .map(|((id, _, _), _)| id) + /// Parses a list of the deletions in the checkpoint and adds them to the removals list. + /// Also removes any name records from the updates, if they ended up being deleted in the same checkpoint. + pub fn parse_record_deletions( + &mut self, + config: &SuinsIndexer, + transaction: &CheckpointTransaction, + ) { + // a list of all the deleted objects in the transaction. + let deleted_objects: HashSet<_> = transaction + .effects + .all_tombstones() + .into_iter() + .map(|(id, _)| id) .collect(); - for input in checkpoint.input_objects() { - if self.is_name_record(input) && deleted_objects.contains(&input.id()) { - removals.push(input.id().to_string()); + for input in transaction.input_objects.iter() { + if config.is_name_record(input) && deleted_objects.contains(&input.id()) { + // since this record was deleted, we need to remove it from the name_records hashmap. + // that catches a case where a name record was edited on a previous transaction in the checkpoint + // and deleted from a different tx later in the checkpoint. + self.name_records.remove(&input.id()); + + // add it in the list of removals + self.removals.insert(input.id()); } } - - removals } - /// Processes a checkpoint and produces a list of `updates` and a list of `removals` - /// - /// We can then use these to execute our DB bulk insertions + bulk deletions. - /// - /// Returns - /// - `Vec`: A list of NameRecord updates for the database (including sequence number) - /// - `Vec`: A list of IDs to be deleted from the database (`field_id` is the matching column) - pub fn process_checkpoint( - &self, - checkpoint: CheckpointData, - ) -> (Vec, Vec) { - let (name_records, subdomain_wrappers) = - self.parse_name_record_changes(&checkpoint.output_objects()); - - let removals = self.parse_name_record_deletions(&checkpoint); - - let updates = prepare_db_updates( - &name_records, - &subdomain_wrappers, - checkpoint.checkpoint_summary.sequence_number, - ); - - (updates, removals) + /// Prepares a vector of `VerifiedDomain`s to be inserted into the DB, taking in account + /// the list of subdomain wrappers created as well as the checkpoint's sequence number. + pub fn prepare_db_updates(&self) -> Vec { + let mut updates: Vec = vec![]; + + for (field_id, name_record_change) in self.name_records.iter() { + let name_record = &name_record_change.0; + + let parent = name_record.name.parent().to_string(); + let nft_id = name_record.value.nft_id.bytes.to_string(); + + updates.push(VerifiedDomain { + field_id: field_id.to_string(), + name: name_record.name.to_string(), + parent, + expiration_timestamp_ms: name_record.value.expiration_timestamp_ms as i64, + nft_id, + target_address: if name_record.value.target_address.is_some() { + Some(SuiAddress::to_string( + &name_record.value.target_address.unwrap(), + )) + } else { + None + }, + // unwrapping must be safe as `value.data` is an on-chain value with VecMap type. + data: serde_json::to_value(&name_record.value.data).unwrap(), + last_checkpoint_updated: self.checkpoint_sequence_number as i64, + subdomain_wrapper_id: self + .subdomain_wrappers + .get(&name_record.name.to_string()) + .cloned(), + }); + } + + updates } } @@ -198,43 +245,3 @@ pub fn format_update_field_query(field: &str) -> String { pub fn format_update_subdomain_wrapper_query() -> String { "CASE WHEN excluded.subdomain_wrapper_id IS NOT NULL THEN excluded.subdomain_wrapper_id ELSE domains.subdomain_wrapper_id END".to_string() } - -/// Prepares a vector of `VerifiedDomain`s to be inserted into the DB, taking in account -/// the list of subdomain wrappers created as well as the checkpoint's sequence number. -pub fn prepare_db_updates( - name_record_changes: &HashMap, - subdomain_wrappers: &HashMap, - checkpoint_seq_num: u64, -) -> Vec { - let mut updates: Vec = vec![]; - - for name_record_change in name_record_changes.values() { - let name_record = &name_record_change.field; - - let parent = name_record.name.parent().to_string(); - let nft_id = name_record.value.nft_id.bytes.to_string(); - - updates.push(VerifiedDomain { - field_id: name_record_change.field_id.to_string(), - name: name_record.name.to_string(), - parent, - expiration_timestamp_ms: name_record.value.expiration_timestamp_ms as i64, - nft_id, - target_address: if name_record.value.target_address.is_some() { - Some(SuiAddress::to_string( - &name_record.value.target_address.unwrap(), - )) - } else { - None - }, - // unwrapping must be safe as `value.data` is an on-chain value with VecMap type. - data: serde_json::to_value(&name_record.value.data).unwrap(), - last_checkpoint_updated: checkpoint_seq_num as i64, - subdomain_wrapper_id: subdomain_wrappers - .get(&name_record.name.to_string()) - .cloned(), - }); - } - - updates -} diff --git a/crates/suins-indexer/src/main.rs b/crates/suins-indexer/src/main.rs index 86c9256c15961..e802ed19137d1 100644 --- a/crates/suins-indexer/src/main.rs +++ b/crates/suins-indexer/src/main.rs @@ -3,13 +3,14 @@ use anyhow::Result; use async_trait::async_trait; -use diesel::{dsl::sql, Connection, ExpressionMethods, RunQueryDsl}; +use diesel::{dsl::sql, BoolExpressionMethods, Connection, ExpressionMethods, RunQueryDsl}; use prometheus::Registry; use std::path::PathBuf; use sui_data_ingestion_core::{ DataIngestionMetrics, FileProgressStore, IndexerExecutor, Worker, WorkerPool, }; use sui_types::full_checkpoint_content::CheckpointData; + use suins_indexer::{ get_connection_pool, indexer::{format_update_field_query, format_update_subdomain_wrapper_query, SuinsIndexer}, @@ -37,7 +38,12 @@ impl SuinsIndexerWorker { /// - The second query is a bulk delete of all deletions. /// /// You can safely call this with empty updates/deletions as it will return Ok. - fn commit_to_db(&self, updates: &[VerifiedDomain], removals: &[String]) -> Result<()> { + fn commit_to_db( + &self, + updates: &[VerifiedDomain], + removals: &[String], + checkpoint_seq_num: u64, + ) -> Result<()> { if updates.is_empty() && removals.is_empty() { return Ok(()); } @@ -73,8 +79,14 @@ impl SuinsIndexerWorker { } if !removals.is_empty() { + // We want to remove from the database all name records that were removed in the checkpoint + // but only if the checkpoint is newer than the last time the name record was updated. diesel::delete(domains::table) - .filter(domains::field_id.eq_any(removals)) + .filter( + domains::field_id + .eq_any(removals) + .and(domains::last_checkpoint_updated.le(checkpoint_seq_num as i64)), + ) .execute(tx) .unwrap_or_else(|_| panic!("Failed to process deletions: {:?}", removals)); } @@ -87,9 +99,10 @@ impl SuinsIndexerWorker { #[async_trait] impl Worker for SuinsIndexerWorker { async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> { - let (updates, removals) = self.indexer.process_checkpoint(checkpoint); + let checkpoint_seq_number = checkpoint.checkpoint_summary.sequence_number; + let (updates, removals) = self.indexer.process_checkpoint(&checkpoint); - self.commit_to_db(&updates, &removals)?; + self.commit_to_db(&updates, &removals, checkpoint_seq_number)?; Ok(()) } } @@ -97,10 +110,11 @@ impl Worker for SuinsIndexerWorker { #[tokio::main] async fn main() -> Result<()> { dotenv().ok(); - let aws_key_id = env::var("AWS_ACCESS_KEY_ID").ok(); - let aws_secret_access_key = env::var("AWS_ACCESS_SECRET_KEY").ok(); - let aws_session_token = env::var("AWS_SESSION_TOKEN").ok(); - let aws_s3_endpoint = env::var("AWS_S3_ENDPOINT").ok(); + let (remote_storage, registry_id, subdomain_wrapper_type) = ( + env::var("REMOTE_STORAGE").ok(), + env::var("REGISTRY_ID").ok(), + env::var("SUBDOMAIN_WRAPPER_TYPE").ok(), + ); let backfill_progress_file_path = env::var("BACKFILL_PROGRESS_FILE_PATH").unwrap_or("/tmp/backfill_progress".to_string()); let checkpoints_dir = env::var("CHECKPOINTS_DIR").unwrap_or("/tmp/checkpoints".to_string()); @@ -112,11 +126,18 @@ async fn main() -> Result<()> { let metrics = DataIngestionMetrics::new(&Registry::new()); let mut executor = IndexerExecutor::new(progress_store, 1, metrics); + let indexer_setup = if let (Some(registry_id), Some(subdomain_wrapper_type)) = + (registry_id, subdomain_wrapper_type) + { + SuinsIndexer::new(registry_id, subdomain_wrapper_type) + } else { + SuinsIndexer::default() + }; + let worker_pool = WorkerPool::new( SuinsIndexerWorker { pg_pool: get_connection_pool(), - // TODO(manos): This should be configurable from env. - indexer: SuinsIndexer::default(), + indexer: indexer_setup, }, "suins_indexing".to_string(), /* task name used as a key in the progress store */ 100, /* concurrency */ @@ -126,21 +147,8 @@ async fn main() -> Result<()> { executor .run( PathBuf::from(checkpoints_dir), /* directory should exist but can be empty */ - aws_s3_endpoint, /* remote_read_endpoint: If set */ - vec![ - ( - "aws_access_key_id".to_string(), - aws_key_id.unwrap_or("".to_string()), - ), - ( - "aws_secret_access_key".to_string(), - aws_secret_access_key.unwrap_or("".to_string()), - ), - ( - "aws_session_token".to_string(), - aws_session_token.unwrap_or("".to_string()), - ), - ], /* aws credentials */ + remote_storage, /* remote_read_endpoint: If set */ + vec![], /* aws credentials */ 100, /* remote_read_batch_size */ exit_receiver, ) diff --git a/crates/suins-indexer/tests/checkpoint_process_tests.rs b/crates/suins-indexer/tests/checkpoint_process_tests.rs index b37d1cb99b65c..34e308394e0f0 100644 --- a/crates/suins-indexer/tests/checkpoint_process_tests.rs +++ b/crates/suins-indexer/tests/checkpoint_process_tests.rs @@ -24,7 +24,7 @@ fn process_22279187_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22279187.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); // This checkpoint has no removals and adds 3 names. assert_eq!(removals.len(), 0); @@ -41,7 +41,7 @@ fn process_22279187_checkpoint() { fn process_22279365_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22279365.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); // This checkpoint has 1 removal and 1 addition. assert_eq!(removals.len(), 1); @@ -62,7 +62,7 @@ fn process_22279365_checkpoint() { fn process_22279496_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22279496.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); assert_eq!(removals.len(), 0); assert_eq!(updates.len(), 1); @@ -85,7 +85,7 @@ fn process_22279496_checkpoint() { fn process_22279944_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22279944.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); assert_eq!(removals.len(), 0); assert_eq!(updates.len(), 1); @@ -100,13 +100,17 @@ fn process_22279944_checkpoint() { addition.subdomain_wrapper_id, Some("0x9ca93181d093598b55787e82f69296819e9f779f25f1cc5226d2cd4d07126790".to_string()) ); + assert_eq!( + addition.field_id, + "0x79b123c73d073ba73c9e6f0817e63270d716db3c7945ecde477b22df7d026e43".to_string() + ) } #[test] fn process_22280030_checkpoint() { let checkpoint = read_checkpoint_from_file(include_bytes!("data/22280030.chk")); let indexer = get_test_indexer(); - let (updates, removals) = indexer.process_checkpoint(checkpoint.clone()); + let (updates, removals) = indexer.process_checkpoint(&checkpoint); assert_eq!(removals.len(), 0); assert_eq!(updates.len(), 1); @@ -121,6 +125,11 @@ fn process_22280030_checkpoint() { addition.subdomain_wrapper_id, Some("0x48de1a7eef5956c4f3478849654abd94dcf5b206c631328c50518091b0eee9b0".to_string()) ); + + assert_eq!( + addition.field_id, + "0x79b123c73d073ba73c9e6f0817e63270d716db3c7945ecde477b22df7d026e43".to_string() + ) } /// Reads a checkpoint from a given file in the `/tests/data` directory.