From bd0dc2fb9b74ca3cfdc3922bc63db2fb8d6098dc Mon Sep 17 00:00:00 2001 From: Ge Gao <106119108+gegaowp@users.noreply.github.com> Date: Thu, 22 Aug 2024 16:42:51 -0400 Subject: [PATCH] indexer fix: multiple object mutations in one checkpoint (#18991) ## Description per Xun's report https://linear.app/mysten-labs/issue/DP-43/bug-epochendindexingobjectstore-might-contain-multiple-versions-of ## Test plan CI ideally in the long run we want to have an embedded DB and test it on CI with a test, but we lack that today. --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- crates/sui-core/src/rest_index.rs | 2 +- .../src/handlers/checkpoint_handler.rs | 225 ++++++------------ .../sui-indexer/src/handlers/tx_processor.rs | 4 +- .../sui-types/src/full_checkpoint_content.rs | 51 +++- 4 files changed, 115 insertions(+), 167 deletions(-) diff --git a/crates/sui-core/src/rest_index.rs b/crates/sui-core/src/rest_index.rs index beb762c6d6a7f..e999cfe2abc2e 100644 --- a/crates/sui-core/src/rest_index.rs +++ b/crates/sui-core/src/rest_index.rs @@ -406,7 +406,7 @@ impl IndexStoreTables { for tx in &checkpoint.transactions { // determine changes from removed objects - for removed_object in tx.removed_objects() { + for removed_object in tx.removed_objects_pre_version() { match removed_object.owner() { Owner::AddressOwner(address) => { let owner_key = OwnerIndexKey::new(*address, removed_object.id()); diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index 7a8f8efe67fa9..a5273f6302e80 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -1,51 +1,45 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::handlers::committer::start_tx_checkpoint_commit_task; -use crate::handlers::tx_processor::IndexingPackageBuffer; -use crate::models::display::StoredDisplay; +use std::collections::{BTreeMap, HashMap}; +use std::sync::{Arc, Mutex}; + use async_trait::async_trait; +use diesel::r2d2::R2D2Connection; use itertools::Itertools; +use tap::tap::TapFallible; +use tokio::sync::watch; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn}; + use move_core_types::annotated_value::{MoveStructLayout, MoveTypeLayout}; use move_core_types::language_storage::{StructTag, TypeTag}; use mysten_metrics::{get_metrics, spawn_monitored_task}; -use std::collections::{BTreeMap, HashMap}; -use std::sync::{Arc, Mutex}; +use sui_data_ingestion_core::Worker; +use sui_json_rpc_types::SuiMoveValue; use sui_package_resolver::{PackageStore, PackageStoreWithLruCache, Resolver}; use sui_rest_api::{CheckpointData, CheckpointTransaction}; -use sui_types::base_types::ObjectRef; +use sui_types::base_types::ObjectID; use sui_types::dynamic_field::DynamicFieldInfo; use sui_types::dynamic_field::DynamicFieldName; use sui_types::dynamic_field::DynamicFieldType; +use sui_types::effects::TransactionEffectsAPI; +use sui_types::event::SystemEpochInfoEvent; use sui_types::messages_checkpoint::{ CertifiedCheckpointSummary, CheckpointContents, CheckpointSequenceNumber, }; use sui_types::object::Object; -use tokio_util::sync::CancellationToken; - -use tokio::sync::watch; - -use diesel::r2d2::R2D2Connection; -use std::collections::hash_map::Entry; -use std::collections::HashSet; -use sui_data_ingestion_core::Worker; -use sui_json_rpc_types::SuiMoveValue; -use sui_types::base_types::SequenceNumber; -use sui_types::effects::{TransactionEffects, TransactionEffectsAPI}; -use sui_types::event::SystemEpochInfoEvent; use sui_types::object::Owner; -use sui_types::transaction::TransactionDataAPI; -use tap::tap::TapFallible; -use tracing::{info, warn}; - -use sui_types::base_types::ObjectID; use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; use sui_types::sui_system_state::{get_sui_system_state, SuiSystemStateTrait}; +use sui_types::transaction::TransactionDataAPI; +use crate::db::ConnectionPool; use crate::errors::IndexerError; +use crate::handlers::committer::start_tx_checkpoint_commit_task; +use crate::handlers::tx_processor::IndexingPackageBuffer; use crate::metrics::IndexerMetrics; - -use crate::db::ConnectionPool; +use crate::models::display::StoredDisplay; use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver}; use crate::store::{IndexerStore, PgIndexerStore}; use crate::types::{ @@ -537,71 +531,41 @@ where ) -> Result { let _timer = metrics.indexing_objects_latency.start_timer(); let checkpoint_seq = data.checkpoint_summary.sequence_number; - let deleted_objects = data - .transactions - .iter() - .flat_map(|tx| get_deleted_objects(&tx.effects)) - .collect::>(); - let deleted_object_ids = deleted_objects - .iter() - .map(|o| (o.0, o.1)) - .collect::>(); - let indexed_deleted_objects = deleted_objects + + let eventually_removed_object_refs_post_version = + data.eventually_removed_object_refs_post_version(); + let indexed_eventually_removed_objects = eventually_removed_object_refs_post_version .into_iter() - .map(|o| IndexedDeletedObject { - object_id: o.0, - object_version: o.1.value(), + .map(|obj_ref| IndexedDeletedObject { + object_id: obj_ref.0, + object_version: obj_ref.1.into(), checkpoint_sequence_number: checkpoint_seq, }) .collect(); - let (latest_objects, intermediate_versions) = get_latest_objects(data.output_objects()); - - let live_objects: Vec = data - .transactions - .iter() - .flat_map(|tx| { - let CheckpointTransaction { - transaction: tx, - effects: fx, - .. - } = tx; - fx.all_changed_objects() - .into_iter() - .filter_map(|(oref, _owner, _kind)| { - // We don't care about objects that are deleted or updated more than once - if intermediate_versions.contains(&(oref.0, oref.1)) - || deleted_object_ids.contains(&(oref.0, oref.1)) - { - return None; - } - let object = latest_objects.get(&(oref.0)).unwrap_or_else(|| { - panic!( - "object {:?} not found in CheckpointData (tx_digest: {})", - oref.0, - tx.digest() - ) - }); - assert_eq!(oref.1, object.version()); - Some(object.clone()) - }) - .collect::>() - }) - .collect(); - + let latest_live_output_objects = data.latest_live_output_objects(); + let latest_live_output_object_map = latest_live_output_objects + .clone() + .into_iter() + .map(|o| (o.id(), o.clone())) + .collect::>(); let move_struct_layout_map = - get_move_struct_layout_map(&live_objects, package_resolver).await?; - let changed_objects = live_objects + get_move_struct_layout_map(latest_live_output_objects.clone(), package_resolver) + .await?; + let changed_objects = latest_live_output_objects .into_iter() .map(|o| { - let df_info = - try_create_dynamic_field_info(&o, &move_struct_layout_map, &latest_objects); - df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o, info)) + let df_info = try_create_dynamic_field_info( + o, + &move_struct_layout_map, + &latest_live_output_object_map, + ); + df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o.clone(), info)) }) .collect::, _>>()?; Ok(TransactionObjectChangesToCommit { changed_objects, - deleted_objects: indexed_deleted_objects, + deleted_objects: indexed_eventually_removed_objects, }) } @@ -614,59 +578,42 @@ where let deleted_objects = data .transactions .iter() - .flat_map(|tx| get_deleted_objects(&tx.effects)) + .flat_map(|tx| tx.removed_object_refs_post_version()) .collect::>(); let indexed_deleted_objects: Vec = deleted_objects .into_iter() - .map(|o| IndexedDeletedObject { - object_id: o.0, - object_version: o.1.value(), + .map(|obj_ref| IndexedDeletedObject { + object_id: obj_ref.0, + object_version: obj_ref.1.into(), checkpoint_sequence_number: checkpoint_seq, }) .collect(); - let (latest_objects, _) = get_latest_objects(data.output_objects()); - let history_object_map = data - .output_objects() + let latest_live_output_objects = data.latest_live_output_objects(); + let latest_live_output_object_map = latest_live_output_objects + .clone() .into_iter() - .map(|o| ((o.id(), o.version()), o.clone())) + .map(|o| (o.id(), o.clone())) .collect::>(); - let history_objects: Vec = data + let output_objects = data .transactions .iter() - .flat_map(|tx| { - let CheckpointTransaction { - transaction: tx, - effects: fx, - .. - } = tx; - fx.all_changed_objects() - .into_iter() - .map(|(oref, _owner, _kind)| { - let history_object = history_object_map.get(&(oref.0, oref.1)).unwrap_or_else(|| { - panic!( - "object {:?} version {:?} not found in CheckpointData (tx_digest: {})", - oref.0, - oref.1, - tx.digest() - ) - }); - assert_eq!(oref.2, history_object.digest()); - history_object.clone() - }) - .collect::>() - }) - .collect(); - + .flat_map(|tx| &tx.output_objects) + .collect::>(); + // TODO(gegaowp): the current df_info implementation is not correct, + // but we have decided remove all df_* except df_kind. let move_struct_layout_map = - get_move_struct_layout_map(&history_objects, package_resolver).await?; - let changed_objects = history_objects + get_move_struct_layout_map(output_objects.clone(), package_resolver).await?; + let changed_objects = output_objects .into_iter() .map(|o| { - let df_info = - try_create_dynamic_field_info(&o, &move_struct_layout_map, &latest_objects); - df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o, info)) + let df_info = try_create_dynamic_field_info( + o, + &move_struct_layout_map, + &latest_live_output_object_map, + ); + df_info.map(|info| IndexedObject::from_object(checkpoint_seq, o.clone(), info)) }) .collect::, _>>()?; @@ -685,8 +632,9 @@ where .iter() .flat_map(|data| { let checkpoint_sequence_number = data.checkpoint_summary.sequence_number; - data.output_objects() + data.transactions .iter() + .flat_map(|tx| &tx.output_objects) .filter_map(|o| { if let sui_types::object::Data::Package(p) = &o.data { Some(IndexedPackage { @@ -710,8 +658,9 @@ where .iter() .flat_map(|data| { let checkpoint_sequence_number = data.checkpoint_summary.sequence_number; - data.output_objects() + data.transactions .iter() + .flat_map(|tx| &tx.output_objects) .filter_map(|o| { if let sui_types::object::Data::Package(p) = &o.data { let indexed_pkg = IndexedPackage { @@ -719,7 +668,7 @@ where move_package: p.clone(), checkpoint_sequence_number, }; - Some((indexed_pkg, (**o).clone())) + Some((indexed_pkg, o.clone())) } else { None } @@ -741,11 +690,11 @@ where } async fn get_move_struct_layout_map( - objects: &[Object], + objects: Vec<&Object>, package_resolver: Arc>, ) -> Result, IndexerError> { let struct_tags = objects - .iter() + .into_iter() .filter_map(|o| { let move_object = o.data.try_as_move().cloned(); move_object.map(|move_object| { @@ -796,40 +745,6 @@ async fn get_move_struct_layout_map( Ok(move_struct_layout_map) } -pub fn get_deleted_objects(effects: &TransactionEffects) -> Vec { - let deleted = effects.deleted().into_iter(); - let wrapped = effects.wrapped().into_iter(); - let unwrapped_then_deleted = effects.unwrapped_then_deleted().into_iter(); - deleted - .chain(wrapped) - .chain(unwrapped_then_deleted) - .collect::>() -} - -pub fn get_latest_objects( - objects: Vec<&Object>, -) -> ( - HashMap, - HashSet<(ObjectID, SequenceNumber)>, -) { - let mut latest_objects = HashMap::new(); - let mut discarded_versions = HashSet::new(); - for object in objects { - match latest_objects.entry(object.id()) { - Entry::Vacant(e) => { - e.insert(object.clone()); - } - Entry::Occupied(mut e) => { - if object.version() > e.get().version() { - discarded_versions.insert((e.get().id(), e.get().version())); - e.insert(object.clone()); - } - } - } - } - (latest_objects, discarded_versions) -} - fn try_create_dynamic_field_info( o: &Object, struct_tag_to_move_struct_layout: &HashMap, diff --git a/crates/sui-indexer/src/handlers/tx_processor.rs b/crates/sui-indexer/src/handlers/tx_processor.rs index 04d96d6eafd04..2ee43f5880ead 100644 --- a/crates/sui-indexer/src/handlers/tx_processor.rs +++ b/crates/sui-indexer/src/handlers/tx_processor.rs @@ -30,7 +30,6 @@ use sui_types::messages_checkpoint::CheckpointSequenceNumber; use crate::errors::IndexerError; use crate::metrics::IndexerMetrics; - use crate::types::IndexedPackage; use crate::types::{IndexedObjectChange, IndexerResult}; @@ -289,9 +288,8 @@ pub(crate) struct EpochEndIndexingObjectStore<'a> { impl<'a> EpochEndIndexingObjectStore<'a> { pub fn new(data: &'a CheckpointData) -> Self { - // We only care about output objects for end-of-epoch indexing Self { - objects: data.output_objects(), + objects: data.latest_live_output_objects(), } } } diff --git a/crates/sui-types/src/full_checkpoint_content.rs b/crates/sui-types/src/full_checkpoint_content.rs index 02b7a897c3bed..4d0cb95217bbc 100644 --- a/crates/sui-types/src/full_checkpoint_content.rs +++ b/crates/sui-types/src/full_checkpoint_content.rs @@ -1,7 +1,12 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::effects::{IDOperation, ObjectIn, ObjectOut, TransactionEffects, TransactionEvents}; +use std::collections::BTreeMap; + +use crate::base_types::ObjectRef; +use crate::effects::{ + IDOperation, ObjectIn, ObjectOut, TransactionEffects, TransactionEffectsAPI, TransactionEvents, +}; use crate::messages_checkpoint::{CertifiedCheckpointSummary, CheckpointContents}; use crate::object::Object; use crate::storage::BackingPackageStore; @@ -18,11 +23,32 @@ pub struct CheckpointData { } impl CheckpointData { - pub fn output_objects(&self) -> Vec<&Object> { - self.transactions - .iter() - .flat_map(|tx| &tx.output_objects) - .collect() + // returns the latest versions of the output objects that still exist at the end of the checkpoint + pub fn latest_live_output_objects(&self) -> Vec<&Object> { + let mut latest_live_objects = BTreeMap::new(); + for tx in self.transactions.iter() { + for obj in tx.output_objects.iter() { + latest_live_objects.insert(obj.id(), obj); + } + for obj_ref in tx.removed_object_refs_post_version() { + latest_live_objects.remove(&(obj_ref.0)); + } + } + latest_live_objects.into_values().collect() + } + + // returns the object refs that are eventually deleted or wrapped in the current checkpoint + pub fn eventually_removed_object_refs_post_version(&self) -> Vec { + let mut eventually_removed_object_refs = BTreeMap::new(); + for tx in self.transactions.iter() { + for obj_ref in tx.removed_object_refs_post_version() { + eventually_removed_object_refs.insert(obj_ref.0, obj_ref); + } + for obj in tx.output_objects.iter() { + eventually_removed_object_refs.remove(&(obj.id())); + } + } + eventually_removed_object_refs.into_values().collect() } pub fn input_objects(&self) -> Vec<&Object> { @@ -51,19 +77,21 @@ pub struct CheckpointTransaction { pub events: Option, /// The state of all inputs to this transaction as they were prior to execution. pub input_objects: Vec, - /// The state of all output objects created or mutated by this transaction. + /// The state of all output objects created or mutated or unwrapped by this transaction. pub output_objects: Vec, } impl CheckpointTransaction { // provide an iterator over all deleted or wrapped objects in this transaction - pub fn removed_objects(&self) -> impl Iterator { + pub fn removed_objects_pre_version(&self) -> impl Iterator { // Iterator over id and versions for all deleted or wrapped objects match &self.effects { TransactionEffects::V1(v1) => Either::Left( // Effects v1 has delted and wrapped objects versions as the "new" version, not the // old one that was actually removed. So we need to take these and then look them // up in the `modified_at_versions`. + // No need to chain unwrapped_then_deleted because these objects must have been wrapped + // before the transaction, hence they will not be in modified_at_versions / input_objects. v1.deleted().iter().chain(v1.wrapped()).map(|(id, _, _)| { // lookup the old version for mutated objects let (_, old_version) = v1 @@ -108,6 +136,13 @@ impl CheckpointTransaction { }) } + pub fn removed_object_refs_post_version(&self) -> impl Iterator { + let deleted = self.effects.deleted().into_iter(); + let wrapped = self.effects.wrapped().into_iter(); + let unwrapped_then_deleted = self.effects.unwrapped_then_deleted().into_iter(); + deleted.chain(wrapped).chain(unwrapped_then_deleted) + } + pub fn changed_objects(&self) -> impl Iterator)> { // Iterator over ((ObjectId, new version), Option) match &self.effects {