Skip to content

Commit

Permalink
[SuiNS Indexer] Dynamic config & edge case fix (#16223)
Browse files Browse the repository at this point in the history
## Description 

This PR:
1. Allows dynamic config for registry + subdomain wrapper ID from env
file (so we can deploy in diff networks)
2. Gets rid of any S3 related envs, we can just use the public endpoint.
3. Rewrites the logic to use a single loop over all transactions in a
sequenced way (so we make sure that in the end of each checkpoint, we
only have the LAST state for updates/deletions per name record).

## Test Plan 

Re-run the existing tests 
```
crates/suins-indexer$ cargo test
```

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
manolisliolios authored Feb 14, 2024
1 parent 1f84041 commit 9495851
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 161 deletions.
12 changes: 6 additions & 6 deletions crates/suins-indexer/.env.sample
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
DATABASE_URL=postgres://<pg_user>:<pg_password>@localhost:5432/<db_name>
AWS_S3_ENDPOINT=https://s3.<region>.amazonaws.com/<bucket_name>
AWS_ACCESS_KEY_ID=
AWS_ACCESS_SECRET_KEY=
AWS_SESSION_TOKEN=
BACKFILL_PROGRESS_FILE_PATH=/tmp/backfill_progress # expects a file in the format { "indexing_name": <checkpoint> }
CHECKPOINTS_DIR=/tmp/backfill_progress
REMOTE_STORAGE=https://checkpoints.mainnet.sui.io
BACKFILL_PROGRESS_FILE_PATH=/tmp/backfill_progress # expects a file in the format { "suins_indexing": <checkpoint> }
CHECKPOINTS_DIR=/tmp/checkpoints
REGISTRY_ID=0xe64cd9db9f829c6cc405d9790bd71567ae07259855f4fba6f02c84f52298c106
SUBDOMAIN_WRAPPER_TYPE=0xe64cd9db9f829c6cc405d9790bd71567ae07259855f4fba6f02c84f52298c106::subdomain_registration::SubDomainRegistration

255 changes: 131 additions & 124 deletions crates/suins-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,23 @@ use std::{
use move_core_types::language_storage::StructTag;
use sui_json_rpc::name_service::{Domain, NameRecord, SubDomainRegistration};
use sui_types::{
base_types::{ObjectID, SequenceNumber, SuiAddress},
base_types::{ObjectID, SuiAddress},
dynamic_field::Field,
full_checkpoint_content::CheckpointData,
full_checkpoint_content::{CheckpointData, CheckpointTransaction},
object::Object,
};

use crate::models::VerifiedDomain;

/// TODO(manos): Hardcode mainnet addresses.
const REGISTRY_TABLE_ID: &str =
"0xe64cd9db9f829c6cc405d9790bd71567ae07259855f4fba6f02c84f52298c106";
/// TODO(manos): Hardcode mainnet once we publish the subdomains package.
const SUBDOMAIN_REGISTRATION_TYPE: &str =
"0xPackageIdTBD::subdomain_registration::SubDomainRegistration";

#[derive(Debug, Clone)]
pub struct NameRecordChange {
/// the NameRecord entry in the table (DF).
field: Field<Domain, NameRecord>,
/// the DF's ID.
field_id: ObjectID,
sequence_number: SequenceNumber,
}
pub struct NameRecordChange(Field<Domain, NameRecord>);

pub struct SuinsIndexer {
registry_table_id: SuiAddress,
subdomain_wrapper_type: StructTag,
Expand Down Expand Up @@ -77,108 +72,160 @@ impl SuinsIndexer {
.is_some_and(|owner| owner == self.registry_table_id)
}

/// Processes a checkpoint and produces a list of `updates` and a list of `removals`
///
/// We can then use these to execute our DB bulk insertions + bulk deletions.
///
/// Returns
/// - `Vec<VerifiedDomain>`: A list of NameRecord updates for the database (including sequence number)
/// - `Vec<String>`: A list of IDs to be deleted from the database (`field_id` is the matching column)
pub fn process_checkpoint(&self, data: &CheckpointData) -> (Vec<VerifiedDomain>, Vec<String>) {
let mut checkpoint = SuinsIndexerCheckpoint::new(data.checkpoint_summary.sequence_number);

// loop through all the transactions in the checkpoint
// Since the transactions are sequenced inside the checkpoint, we can safely assume
// that we have the latest data for each name record in the end of the loop.
for transaction in &data.transactions {
// Add all name record changes to the name_records HashMap.
// Remove any removals that got re-created.
checkpoint.parse_record_changes(self, &transaction.output_objects);

// Gather all removals from the transaction,
// and delete any name records from the name_records if it got deleted.
checkpoint.parse_record_deletions(self, transaction);
}

(
// Convert our name_records & wrappers into a list of updates for the DB.
checkpoint.prepare_db_updates(),
checkpoint
.removals
.into_iter()
.map(|id| id.to_string())
.collect(),
)
}
}

pub struct SuinsIndexerCheckpoint {
/// A list of name records that have been updated in the checkpoint.
name_records: HashMap<ObjectID, NameRecordChange>,
/// A list of subdomain wrappers that have been created in the checkpoint.
subdomain_wrappers: HashMap<String, String>,
/// A list of name records that have been deleted in the checkpoint.
removals: HashSet<ObjectID>,
/// The sequence number of the checkpoint.
checkpoint_sequence_number: u64,
}

impl SuinsIndexerCheckpoint {
pub fn new(checkpoint_sequence_number: u64) -> Self {
Self {
name_records: HashMap::new(),
subdomain_wrappers: HashMap::new(),
removals: HashSet::new(),
checkpoint_sequence_number,
}
}

/// Parses the name record changes + subdomain wraps.
/// and pushes them into the supplied vector + hashmap.
///
/// It is implemented in a way to do just a single iteration over the objects.
pub fn parse_name_record_changes(
&self,
objects: &[&Object],
) -> (HashMap<ObjectID, NameRecordChange>, HashMap<String, String>) {
let mut name_records: HashMap<ObjectID, NameRecordChange> = HashMap::new();
let mut subdomain_wrappers: HashMap<String, String> = HashMap::new();

for &object in objects {
pub fn parse_record_changes(&mut self, config: &SuinsIndexer, objects: &[Object]) {
for object in objects {
// Parse all the changes to a `NameRecord`
if self.is_name_record(object) {
if config.is_name_record(object) {
let name_record: Field<Domain, NameRecord> = object
.to_rust()
.unwrap_or_else(|| panic!("Failed to parse name record for {:?}", object));

let id: ObjectID = object.id();

// If we already have a newer version of the same name record, skip insertion.
// That prevents us from falling into PG's bulk insertions double conflicts.
if name_records
.get(&id)
.is_some_and(|x| x.sequence_number > object.version())
{
continue;
}

name_records.insert(
id,
NameRecordChange {
field: name_record,
field_id: object.id(),
sequence_number: object.version(),
},
);
let id = object.id();

// Remove from the removals list if it's there.
// The reason it might have been there is that the same name record might have been
// deleted in a previous transaction in the same checkpoint, and now it got re-created.
self.removals.remove(&id);

self.name_records.insert(id, NameRecordChange(name_record));
}
// Parse subdomain wrappers and save them in our hashmap.
// Later, we'll save the id of the wrapper in the name record.
// NameRecords & their equivalent SubdomainWrappers are always created in the same PTB, so we can safely assume
// that the wrapper will be created on the same checkpoint as the name record and vice versa.
if self.is_subdomain_wrapper(object) {
if config.is_subdomain_wrapper(object) {
let sub_domain: SubDomainRegistration = object.to_rust().unwrap();
subdomain_wrappers.insert(
self.subdomain_wrappers.insert(
sub_domain.nft.domain_name,
sub_domain.id.id.bytes.to_string(),
);
};
}

(name_records, subdomain_wrappers)
}

/// For each input object, we're parsing the name record deletions
/// A deletion we want to track is a deleted object which is of `NameRecord` type.
/// Domain replacements do not count as deletions, but instead are an update to the latest state.
pub fn parse_name_record_deletions(&self, checkpoint: &CheckpointData) -> Vec<String> {
let mut removals: Vec<String> = vec![];

// Gather all object ids that got deleted.
// This way, we can delete removed name records
// (detects burning of expired names or leaf names removal).
let deleted_objects: HashSet<_> = checkpoint
.transactions
.iter()
.flat_map(|x| x.effects.all_removed_objects())
.map(|((id, _, _), _)| id)
/// Parses a list of the deletions in the checkpoint and adds them to the removals list.
/// Also removes any name records from the updates, if they ended up being deleted in the same checkpoint.
pub fn parse_record_deletions(
&mut self,
config: &SuinsIndexer,
transaction: &CheckpointTransaction,
) {
// a list of all the deleted objects in the transaction.
let deleted_objects: HashSet<_> = transaction
.effects
.all_tombstones()
.into_iter()
.map(|(id, _)| id)
.collect();

for input in checkpoint.input_objects() {
if self.is_name_record(input) && deleted_objects.contains(&input.id()) {
removals.push(input.id().to_string());
for input in transaction.input_objects.iter() {
if config.is_name_record(input) && deleted_objects.contains(&input.id()) {
// since this record was deleted, we need to remove it from the name_records hashmap.
// that catches a case where a name record was edited on a previous transaction in the checkpoint
// and deleted from a different tx later in the checkpoint.
self.name_records.remove(&input.id());

// add it in the list of removals
self.removals.insert(input.id());
}
}

removals
}

/// Processes a checkpoint and produces a list of `updates` and a list of `removals`
///
/// We can then use these to execute our DB bulk insertions + bulk deletions.
///
/// Returns
/// - `Vec<VerifiedDomain>`: A list of NameRecord updates for the database (including sequence number)
/// - `Vec<String>`: A list of IDs to be deleted from the database (`field_id` is the matching column)
pub fn process_checkpoint(
&self,
checkpoint: CheckpointData,
) -> (Vec<VerifiedDomain>, Vec<String>) {
let (name_records, subdomain_wrappers) =
self.parse_name_record_changes(&checkpoint.output_objects());

let removals = self.parse_name_record_deletions(&checkpoint);

let updates = prepare_db_updates(
&name_records,
&subdomain_wrappers,
checkpoint.checkpoint_summary.sequence_number,
);

(updates, removals)
/// Prepares a vector of `VerifiedDomain`s to be inserted into the DB, taking in account
/// the list of subdomain wrappers created as well as the checkpoint's sequence number.
pub fn prepare_db_updates(&self) -> Vec<VerifiedDomain> {
let mut updates: Vec<VerifiedDomain> = vec![];

for (field_id, name_record_change) in self.name_records.iter() {
let name_record = &name_record_change.0;

let parent = name_record.name.parent().to_string();
let nft_id = name_record.value.nft_id.bytes.to_string();

updates.push(VerifiedDomain {
field_id: field_id.to_string(),
name: name_record.name.to_string(),
parent,
expiration_timestamp_ms: name_record.value.expiration_timestamp_ms as i64,
nft_id,
target_address: if name_record.value.target_address.is_some() {
Some(SuiAddress::to_string(
&name_record.value.target_address.unwrap(),
))
} else {
None
},
// unwrapping must be safe as `value.data` is an on-chain value with VecMap<String,String> type.
data: serde_json::to_value(&name_record.value.data).unwrap(),
last_checkpoint_updated: self.checkpoint_sequence_number as i64,
subdomain_wrapper_id: self
.subdomain_wrappers
.get(&name_record.name.to_string())
.cloned(),
});
}

updates
}
}

Expand All @@ -198,43 +245,3 @@ pub fn format_update_field_query(field: &str) -> String {
pub fn format_update_subdomain_wrapper_query() -> String {
"CASE WHEN excluded.subdomain_wrapper_id IS NOT NULL THEN excluded.subdomain_wrapper_id ELSE domains.subdomain_wrapper_id END".to_string()
}

/// Prepares a vector of `VerifiedDomain`s to be inserted into the DB, taking in account
/// the list of subdomain wrappers created as well as the checkpoint's sequence number.
pub fn prepare_db_updates(
name_record_changes: &HashMap<ObjectID, NameRecordChange>,
subdomain_wrappers: &HashMap<String, String>,
checkpoint_seq_num: u64,
) -> Vec<VerifiedDomain> {
let mut updates: Vec<VerifiedDomain> = vec![];

for name_record_change in name_record_changes.values() {
let name_record = &name_record_change.field;

let parent = name_record.name.parent().to_string();
let nft_id = name_record.value.nft_id.bytes.to_string();

updates.push(VerifiedDomain {
field_id: name_record_change.field_id.to_string(),
name: name_record.name.to_string(),
parent,
expiration_timestamp_ms: name_record.value.expiration_timestamp_ms as i64,
nft_id,
target_address: if name_record.value.target_address.is_some() {
Some(SuiAddress::to_string(
&name_record.value.target_address.unwrap(),
))
} else {
None
},
// unwrapping must be safe as `value.data` is an on-chain value with VecMap<String,String> type.
data: serde_json::to_value(&name_record.value.data).unwrap(),
last_checkpoint_updated: checkpoint_seq_num as i64,
subdomain_wrapper_id: subdomain_wrappers
.get(&name_record.name.to_string())
.cloned(),
});
}

updates
}
Loading

0 comments on commit 9495851

Please sign in to comment.