Skip to content

Commit

Permalink
repair coin index
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o committed Aug 29, 2024
1 parent df95820 commit e5db547
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 4 deletions.
4 changes: 3 additions & 1 deletion crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::execution_cache::ExecutionCacheTraitPointers;
use crate::execution_cache::TransactionCacheRead;
use crate::rest_index::RestIndexStore;
use crate::transaction_outputs::TransactionOutputs;
use crate::verify_indexes::verify_indexes;
use crate::verify_indexes::{fix_indexes, verify_indexes};
use anyhow::anyhow;
use arc_swap::{ArcSwap, Guard};
use async_trait::async_trait;
Expand Down Expand Up @@ -2704,6 +2704,8 @@ impl AuthorityState {
validator_tx_finalizer,
});

let state_clone = state.clone();
spawn_monitored_task!(fix_indexes(state_clone));
// Start a task to execute ready certificates.
let authority_state = Arc::downgrade(&state);
spawn_monitored_task!(execution_process(
Expand Down
53 changes: 53 additions & 0 deletions crates/sui-core/src/verify_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
use std::{collections::BTreeMap, sync::Arc};

use anyhow::{anyhow, bail, Result};
use sui_storage::indexes::CoinIndexKey;
use sui_storage::{indexes::CoinInfo, IndexStore};
use sui_types::{base_types::ObjectInfo, object::Owner};
use tracing::info;
use typed_store::traits::Map;

use crate::authority::AuthorityState;
use crate::{authority::authority_store_tables::LiveObject, state_accumulator::AccumulatorStore};

/// This is a very expensive function that verifies some of the secondary indexes. This is done by
Expand Down Expand Up @@ -88,3 +90,54 @@ pub fn verify_indexes(store: &dyn AccumulatorStore, indexes: Arc<IndexStore>) ->

Ok(())
}

pub async fn fix_indexes(authority_state: Arc<AuthorityState>) -> Result<()> {
let is_violation = |coin_index_key: &CoinIndexKey,
state: &Arc<AuthorityState>|
-> anyhow::Result<bool> {
if let Some(object) = state.get_object_store().get_object(&coin_index_key.2)? {
if matches!(object.owner, Owner::AddressOwner(real_owner_id) | Owner::ObjectOwner(real_owner_id) if coin_index_key.0 == real_owner_id)
{
return Ok(false);
}
}
Ok(true)
};

tracing::info!("Starting fixing coin index");
// populate candidate list without locking. Some entries are benign
let authority_state_clone = authority_state.clone();
let candidates = tokio::task::spawn_blocking(move || {
let mut batch = vec![];
if let Some(indexes) = &authority_state_clone.indexes {
for (coin_index_key, _) in indexes.tables().coin_index().unbounded_iter() {
if is_violation(&coin_index_key, &authority_state_clone)? {
batch.push(coin_index_key);
}
}
}
Ok::<Vec<_>, anyhow::Error>(batch)
})
.await??;

if let Some(indexes) = &authority_state.indexes {
for chunk in candidates.chunks(100) {
let _locks = indexes
.caches
.locks
.acquire_locks(chunk.iter().map(|key| key.0))
.await;
let mut batch = vec![];
for key in chunk {
if is_violation(key, &authority_state)? {
batch.push(key);
}
}
let mut wb = indexes.tables().coin_index().batch();
wb.delete_batch(indexes.tables().coin_index(), batch)?;
wb.write()?;
}
}
tracing::info!("Finished fix for the coin index");
Ok(())
}
6 changes: 3 additions & 3 deletions crates/sui-storage/src/indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use typed_store::traits::{TableSummary, TypedStoreDebug};
use typed_store::DBMapUtils;

type OwnerIndexKey = (SuiAddress, ObjectID);
type CoinIndexKey = (SuiAddress, String, ObjectID);
pub type CoinIndexKey = (SuiAddress, String, ObjectID);
type DynamicFieldKey = (ObjectID, ObjectID);
type EventId = (TxSequenceNumber, usize);
type EventIndex = (TransactionEventsDigest, TransactionDigest, u64);
Expand Down Expand Up @@ -129,7 +129,7 @@ impl IndexStoreMetrics {
pub struct IndexStoreCaches {
per_coin_type_balance: ShardedLruCache<(SuiAddress, TypeTag), SuiResult<TotalBalance>>,
all_balances: ShardedLruCache<SuiAddress, SuiResult<Arc<HashMap<TypeTag, TotalBalance>>>>,
locks: MutexTable<SuiAddress>,
pub locks: MutexTable<SuiAddress>,
}

#[derive(Default)]
Expand Down Expand Up @@ -229,7 +229,7 @@ impl IndexStoreTables {
pub struct IndexStore {
next_sequence_number: AtomicU64,
tables: IndexStoreTables,
caches: IndexStoreCaches,
pub caches: IndexStoreCaches,
metrics: Arc<IndexStoreMetrics>,
max_type_length: u64,
remove_deprecated_tables: bool,
Expand Down

0 comments on commit e5db547

Please sign in to comment.