diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 10752097e09ed..2f88601337da0 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -19,6 +19,7 @@ use move_binary_format::binary_config::BinaryConfig; use move_binary_format::CompiledModule; use move_core_types::annotated_value::MoveStructLayout; use move_core_types::language_storage::ModuleId; +use mysten_common::fatal; use mysten_metrics::{TX_TYPE_SHARED_OBJ_TX, TX_TYPE_SINGLE_WRITER_TX}; use parking_lot::Mutex; use prometheus::{ @@ -1606,6 +1607,8 @@ impl AuthorityState { .force_reload_system_packages(&BuiltInFramework::all_package_ids()); } + epoch_store.remove_shared_version_assignments(&tx_key); + // commit_certificate finished, the tx is fully committed to the store. tx_guard.commit_tx(); @@ -3058,15 +3061,6 @@ impl AuthorityState { .enqueue_certificates(certs, epoch_store) } - pub(crate) fn enqueue_with_expected_effects_digest( - &self, - certs: Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>, - epoch_store: &AuthorityPerEpochStore, - ) { - self.transaction_manager - .enqueue_with_expected_effects_digest(certs, epoch_store) - } - fn create_owner_index_if_empty( &self, genesis_objects: &[Object], @@ -3153,6 +3147,7 @@ impl AuthorityState { epoch_start_configuration: EpochStartConfiguration, accumulator: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, + epoch_last_checkpoint: CheckpointSequenceNumber, ) -> SuiResult> { Self::check_protocol_version( supported_protocol_versions, @@ -3209,6 +3204,7 @@ impl AuthorityState { new_committee, epoch_start_configuration, expensive_safety_check_config, + epoch_last_checkpoint, ) .await?; assert_eq!(new_epoch_store.epoch(), new_epoch); @@ -3239,6 +3235,12 @@ impl AuthorityState { self.get_backing_package_store().clone(), self.get_object_store().clone(), &self.config.expensive_safety_check_config, + *self + .checkpoint_store + .get_epoch_last_checkpoint(epoch_store.epoch()) + .unwrap() + .unwrap() + .sequence_number(), ); let new_epoch = new_epoch_store.epoch(); self.transaction_manager.reconfigure(new_epoch); @@ -5158,6 +5160,7 @@ impl AuthorityState { new_committee: Committee, epoch_start_configuration: EpochStartConfiguration, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, + epoch_last_checkpoint: CheckpointSequenceNumber, ) -> SuiResult> { let new_epoch = new_committee.epoch; info!(new_epoch = ?new_epoch, "re-opening AuthorityEpochTables for new epoch"); @@ -5174,6 +5177,7 @@ impl AuthorityState { self.get_object_store().clone(), expensive_safety_check_config, cur_epoch_store.get_chain_identifier(), + epoch_last_checkpoint, ); self.epoch_store.store(new_epoch_store.clone()); cur_epoch_store.epoch_terminated().await; @@ -5275,6 +5279,15 @@ impl RandomnessRoundReceiver { let transaction = VerifiedExecutableTransaction::new_system(transaction, epoch); let digest = *transaction.digest(); + // Randomness state updates contain the full bls signature for the random round, + // which cannot necessarily be reconstructed again later. Therefore we must immediately + // persist this transaction. If we crash before its outputs are committed, this + // ensures we will be able to re-execute it. + self.authority_state + .get_cache_commit() + .persist_transaction(&transaction) + .expect("failed to persist randomness state update transaction"); + // Send transaction to TransactionManager for execution. self.authority_state .transaction_manager() @@ -5315,9 +5328,9 @@ impl RandomnessRoundReceiver { let mut effects = result.unwrap_or_else(|_| panic!("failed to get effects for randomness state update transaction at epoch {epoch}, round {round}")); let effects = effects.pop().expect("should return effects"); if *effects.status() != ExecutionStatus::Success { - panic!("failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"); + fatal!("failed to execute randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"); } - debug!("successfully executed randomness state update transaction at epoch {epoch}, round {round}"); + debug!("successfully executed randomness state update transaction at epoch {epoch}, round {round}: {effects:?}"); }); } } diff --git a/crates/sui-core/src/authority/authority_per_epoch_store.rs b/crates/sui-core/src/authority/authority_per_epoch_store.rs index 70c1e98bbb2e1..bec10ca46fced 100644 --- a/crates/sui-core/src/authority/authority_per_epoch_store.rs +++ b/crates/sui-core/src/authority/authority_per_epoch_store.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use arc_swap::ArcSwapOption; +use dashmap::DashMap; use enum_dispatch::enum_dispatch; use fastcrypto::groups::bls12381; use fastcrypto_tbls::dkg; @@ -14,7 +15,7 @@ use itertools::{izip, Itertools}; use parking_lot::RwLock; use parking_lot::{Mutex, RwLockReadGuard, RwLockWriteGuard}; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; +use std::collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}; use std::future::Future; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -71,7 +72,7 @@ use crate::epoch::randomness::{ VersionedUsedProcessedMessages, SINGLETON_KEY, }; use crate::epoch::reconfiguration::ReconfigState; -use crate::execution_cache::ObjectCacheRead; +use crate::execution_cache::{ObjectCacheRead, TransactionCacheRead}; use crate::module_cache_metrics::ResolverMetrics; use crate::post_consensus_tx_reorder::PostConsensusTxReorder; use crate::signature_verifier::*; @@ -291,6 +292,12 @@ pub struct AuthorityPerEpochStore { /// and it needs to be cleared at the end of the epoch. tables: ArcSwapOption, + consensus_quarantine: RwLock, + + shared_version_assignments: DashMap>, + deferred_transactions: Mutex>>, + user_signatures_for_checkpoints: RwLock>>, + protocol_config: ProtocolConfig, // needed for re-opening epoch db. @@ -404,37 +411,14 @@ pub struct AuthorityEpochTables { /// Transactions that were executed in the current epoch. executed_in_epoch: DBMap, - /// The tables below manage shared object locks / versions. There are three ways they can be - /// updated: - /// 1. (validators only): Upon receiving a certified transaction from consensus, the authority - /// assigns the next version to each shared object of the transaction. The next versions of - /// the shared objects are updated as well. - /// 2. (validators only): Upon receiving a new consensus commit, the authority assigns the - /// next version of the randomness state object to an expected future transaction to be - /// generated after the next random value is available. The next version of the randomness - /// state object is updated as well. - /// 3. (fullnodes + validators): Upon receiving a certified effect from state sync, or - /// transaction orchestrator fast execution path, the node assigns the shared object - /// versions from the transaction effect. Next object versions are not updated. - /// - /// REQUIRED: all authorities must assign the same shared object versions for each transaction. + #[allow(dead_code)] assigned_shared_object_versions_v2: DBMap>, + + /// Next available shared object versions for each shared object. next_shared_object_versions: DBMap, - /// Deprecated table for pre-random-beacon shared object versions. + /// Deprecated table #[allow(dead_code)] - assigned_shared_object_versions: DBMap>, - - /// Certificates that have been received from clients or received from consensus, but not yet - /// executed. Entries are cleared after execution. - /// This table is critical for crash recovery, because usually the consensus output progress - /// is updated after a certificate is committed into this table. - /// - /// In theory, this table may be superseded by storing consensus and checkpoint execution - /// progress. But it is more complex, because it would be necessary to track inflight - /// executions not ordered by indices. For now, tracking inflight certificates as a map - /// seems easier. - #[default_options_override_fn = "pending_execution_table_default_config"] pub(crate) pending_execution: DBMap, /// Track which transactions have been processed in handle_consensus_transaction. We must be @@ -479,15 +463,7 @@ pub struct AuthorityEpochTables { #[allow(dead_code)] final_epoch_checkpoint: DBMap, - /// This table has information for the checkpoints for which we constructed all the data - /// from consensus, but not yet constructed actual checkpoint. - /// - /// Key in this table is the consensus commit height and not a checkpoint sequence number. - /// - /// Non-empty list of transactions here might result in empty list when we are forming checkpoint. - /// Because we don't want to create checkpoints with empty content(see CheckpointBuilder::write_checkpoint), - /// the sequence number of checkpoint does not match height here. - #[default_options_override_fn = "pending_checkpoints_table_default_config"] + #[allow(dead_code)] pending_checkpoints_v2: DBMap, /// Deprecated table for pre-random-beacon checkpoints. @@ -506,8 +482,8 @@ pub struct AuthorityEpochTables { pending_checkpoint_signatures: DBMap<(CheckpointSequenceNumber, u64), CheckpointSignatureMessage>, - /// When we see certificate through consensus for the first time, we record - /// user signature for this transaction here. This will be included in the checkpoint later. + /// Deprecated - pending signatures are now stored in memory. + #[allow(dead_code)] user_signatures_for_checkpoints: DBMap>, /// This table is not used @@ -614,24 +590,12 @@ fn owned_object_transaction_locks_table_default_config() -> DBOptions { } } -fn pending_execution_table_default_config() -> DBOptions { - default_db_options() - .optimize_for_write_throughput() - .optimize_for_large_values_no_scan(1 << 10) -} - fn pending_consensus_transactions_table_default_config() -> DBOptions { default_db_options() .optimize_for_write_throughput() .optimize_for_large_values_no_scan(1 << 10) } -fn pending_checkpoints_table_default_config() -> DBOptions { - default_db_options() - .optimize_for_write_throughput() - .optimize_for_large_values_no_scan(1 << 10) -} - impl AuthorityEpochTables { pub fn open(epoch: EpochId, parent_path: &Path, db_options: Option) -> Self { Self::open_tables_transactional( @@ -746,6 +710,15 @@ impl AuthorityEpochTables { batch.write()?; Ok(()) } + + fn get_all_deferred_transactions( + &self, + ) -> SuiResult>> { + Ok(self + .deferred_transactions + .safe_iter() + .collect::>()?) + } } pub(crate) const MUTEX_TABLE_SIZE: usize = 1024; @@ -765,6 +738,7 @@ impl AuthorityPerEpochStore { signature_verifier_metrics: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, chain_identifier: ChainIdentifier, + highest_executed_checkpoint: CheckpointSequenceNumber, ) -> Arc { let current_time = Instant::now(); let epoch_id = committee.epoch; @@ -864,11 +838,21 @@ impl AuthorityPerEpochStore { let jwk_aggregator = Mutex::new(jwk_aggregator); + let deferred_transactions = tables + .get_all_deferred_transactions() + .expect("load deferred transactions cannot fail"); + let s = Arc::new(Self { name, committee, protocol_config, tables: ArcSwapOption::new(Some(Arc::new(tables))), + shared_version_assignments: Default::default(), + deferred_transactions: Mutex::new(deferred_transactions), + user_signatures_for_checkpoints: Default::default(), + consensus_quarantine: RwLock::new(ConsensusOutputQuarantine::new( + highest_executed_checkpoint, + )), parent_path: parent_path.to_path_buf(), db_options, reconfig_state_mem: RwLock::new(reconfig_state), @@ -1041,6 +1025,7 @@ impl AuthorityPerEpochStore { object_store: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, chain_identifier: ChainIdentifier, + previous_epoch_last_checkpoint: CheckpointSequenceNumber, ) -> Arc { assert_eq!(self.epoch() + 1, new_committee.epoch); self.record_reconfig_halt_duration_metric(); @@ -1058,6 +1043,7 @@ impl AuthorityPerEpochStore { self.signature_verifier.metrics.clone(), expensive_safety_check_config, chain_identifier, + previous_epoch_last_checkpoint, ) } @@ -1066,6 +1052,7 @@ impl AuthorityPerEpochStore { backing_package_store: Arc, object_store: Arc, expensive_safety_check_config: &ExpensiveSafetyCheckConfig, + previous_epoch_last_checkpoint: CheckpointSequenceNumber, ) -> Arc { let next_epoch = self.epoch() + 1; let next_committee = Committee::new( @@ -1081,6 +1068,7 @@ impl AuthorityPerEpochStore { object_store, expensive_safety_check_config, self.chain_identifier, + previous_epoch_last_checkpoint, ) } @@ -1266,6 +1254,10 @@ impl AuthorityPerEpochStore { Ok(()) } + pub(crate) fn remove_shared_version_assignments(&self, tx_key: &TransactionKey) { + self.shared_version_assignments.remove(tx_key); + } + pub fn revert_executed_transaction(&self, tx_digest: &TransactionDigest) -> SuiResult { let tables = self.tables()?; let mut batch = tables.effects_signatures.batch(); @@ -1333,23 +1325,28 @@ impl AuthorityPerEpochStore { &self, key: &TransactionKey, objects: &[InputObjectKind], - ) -> BTreeSet { - let mut shared_locks = HashMap::::new(); + ) -> SuiResult> { + //let mut shared_locks = HashMap::::new(); + let shared_locks = + once_cell::unsync::OnceCell::>>::new(); objects .iter() .map(|kind| { - match kind { + Ok(match kind { InputObjectKind::SharedMoveObject { id, .. } => { - if shared_locks.is_empty() { - shared_locks = self - .get_shared_locks(key) - .expect("Read from storage should not fail!") - .into_iter() - .collect(); - } - // If we can't find the locked version, it means - // 1. either we have a bug that skips shared object version assignment - // 2. or we have some DB corruption + // It is possible to + let shared_locks = shared_locks + .get_or_init(|| { + self.get_shared_locks(key) + .map(|locks| locks.into_iter().collect()) + }) + .as_ref() + .ok_or(SuiError::GenericAuthorityError { + error: "no shared locks".to_string(), + })?; + + // If we found locks, but they are missing the assignment for this object, + // it indicates a serious inconsistency! let Some(version) = shared_locks.get(id) else { panic!( "Shared object locks should have been set. key: {key:?}, obj \ @@ -1366,12 +1363,16 @@ impl AuthorityPerEpochStore { id: objref.0, version: objref.1, }, - } + }) }) .collect() } pub fn get_last_consensus_stats(&self) -> SuiResult { + assert!( + !self.consensus_quarantine.read().has_last_consensus_stats(), + "get_last_consensus_stats should only be called at startup" + ); match self .tables()? .get_last_consensus_stats() @@ -1438,21 +1439,13 @@ impl AuthorityPerEpochStore { Ok(result) } - /// `pending_certificates` table related methods. Should only be used from TransactionManager. - - /// Gets all pending certificates. Used during recovery. - pub fn all_pending_execution(&self) -> SuiResult> { - Ok(self - .tables()? - .pending_execution - .unbounded_iter() - .map(|(_, cert)| cert.into()) - .collect()) - } - /// Called when transaction outputs are committed to disk #[instrument(level = "trace", skip_all)] - pub fn handle_committed_transactions(&self, digests: &[TransactionDigest]) -> SuiResult<()> { + pub fn handle_finalized_checkpoint( + &self, + checkpoint: &CheckpointSummary, + digests: &[TransactionDigest], + ) -> SuiResult<()> { let tables = match self.tables() { Ok(tables) => tables, // After Epoch ends, it is no longer necessary to remove pending transactions @@ -1460,13 +1453,7 @@ impl AuthorityPerEpochStore { Err(SuiError::EpochEnded(_)) => return Ok(()), Err(e) => return Err(e), }; - let mut batch = tables.pending_execution.batch(); - // pending_execution stores transactions received from consensus which may not have - // been executed yet. At this point, they have been committed to the db durably and - // can be removed. - // After end-to-end quarantining, we will not need pending_execution since the consensus - // log itself will be used for recovery. - batch.delete_batch(&tables.pending_execution, digests)?; + let mut batch = tables.signed_effects_digests.batch(); // Now that the transaction effects are committed, we will never re-execute, so we // don't need to worry about equivocating. @@ -1475,12 +1462,18 @@ impl AuthorityPerEpochStore { // Note that this does not delete keys for random transactions. The worst case result // of this is that we restart at the end of the epoch and load about 160k keys into // memory. + // TODO: remove batch.delete_batch( &tables.assigned_shared_object_versions_v2, digests.iter().map(|d| TransactionKey::Digest(*d)), )?; + let seq = *checkpoint.sequence_number(); + + let mut quarantine = self.consensus_quarantine.write(); + quarantine.update_highest_executed_checkpoint(seq, self, &mut batch)?; batch.write()?; + Ok(()) } @@ -1502,11 +1495,12 @@ impl AuthorityPerEpochStore { pub fn set_shared_object_versions_for_testing( &self, tx_digest: &TransactionDigest, - assigned_versions: &Vec<(ObjectID, SequenceNumber)>, + assigned_versions: &[(ObjectID, SequenceNumber)], ) -> SuiResult { - self.tables()? - .assigned_shared_object_versions_v2 - .insert(&TransactionKey::Digest(*tx_digest), assigned_versions)?; + self.shared_version_assignments.insert( + TransactionKey::Digest(*tx_digest), + assigned_versions.to_owned(), + ); Ok(()) } @@ -1605,8 +1599,10 @@ impl AuthorityPerEpochStore { let ids: Vec<_> = objects_to_init.iter().map(|(id, _)| *id).collect(); - let next_versions = db_transaction - .multi_get(&self.tables()?.next_shared_object_versions, ids.clone())?; + let next_versions = self + .consensus_quarantine + .read() + .get_next_shared_object_versions(&tables, &ids)?; let uninitialized_objects: Vec<(ObjectID, SequenceNumber)> = next_versions .iter() @@ -1648,24 +1644,18 @@ impl AuthorityPerEpochStore { ?versions_to_write, "initializing next_shared_object_versions" ); - db_transaction.insert_batch( - &self.tables()?.next_shared_object_versions, - versions_to_write, - )?; + db_transaction.insert_batch(&tables.next_shared_object_versions, versions_to_write)?; db_transaction.commit() })?; Ok(ret) } - async fn set_assigned_shared_object_versions_with_db_batch( - &self, - versions: AssignedTxAndVersions, - db_batch: &mut DBBatch, - ) -> SuiResult { + fn set_assigned_shared_object_versions(&self, versions: AssignedTxAndVersions) { debug!("set_assigned_shared_object_versions: {:?}", versions); - db_batch.insert_batch(&self.tables()?.assigned_shared_object_versions_v2, versions)?; - Ok(()) + for (key, value) in &versions { + self.shared_version_assignments.insert(*key, value.clone()); + } } /// Given list of certificates, assign versions for all shared objects used in them. @@ -1679,7 +1669,6 @@ impl AuthorityPerEpochStore { cache_reader: &dyn ObjectCacheRead, certificates: &[VerifiedExecutableTransaction], ) -> SuiResult { - let mut db_batch = self.tables()?.assigned_shared_object_versions_v2.batch(); let assigned_versions = SharedObjVerManager::assign_versions_from_consensus( self, cache_reader, @@ -1689,9 +1678,7 @@ impl AuthorityPerEpochStore { ) .await? .assigned_versions; - self.set_assigned_shared_object_versions_with_db_batch(assigned_versions, &mut db_batch) - .await?; - db_batch.write()?; + self.set_assigned_shared_object_versions(assigned_versions); Ok(()) } @@ -1750,22 +1737,18 @@ impl AuthorityPerEpochStore { debug!("Query epoch store to load deferred txn {:?} {:?}", min, max); let mut keys = Vec::new(); let mut txns = Vec::new(); - self.tables()? - .deferred_transactions - .safe_iter_with_bounds(Some(min), Some(max)) - .try_for_each(|result| match result { - Ok((key, txs)) => { - debug!( - "Loaded {:?} deferred txn with deferral key {:?}", - txs.len(), - key - ); - keys.push(key); - txns.push((key, txs)); - Ok(()) - } - Err(err) => Err(err), - })?; + + let mut deferred_transactions = self.deferred_transactions.lock(); + + for (key, transactions) in deferred_transactions.range(min..max) { + debug!( + "Loaded {:?} deferred txn with deferral key {:?}", + transactions.len(), + key + ); + keys.push(*key); + txns.push((*key, transactions.clone())); + } // verify that there are no duplicates - should be impossible due to // is_consensus_message_processed @@ -1779,6 +1762,10 @@ impl AuthorityPerEpochStore { } } + for key in &keys { + deferred_transactions.remove(key); + } + output.delete_loaded_deferred_transactions(&keys); Ok(txns) @@ -1786,12 +1773,12 @@ impl AuthorityPerEpochStore { pub fn get_all_deferred_transactions_for_test( &self, - ) -> SuiResult)>> { - Ok(self - .tables()? - .deferred_transactions - .safe_iter() - .collect::, _>>()?) + ) -> Vec<(DeferralKey, Vec)> { + self.deferred_transactions + .lock() + .iter() + .map(|(key, txs)| (*key, txs.clone())) + .collect() } fn get_max_accumulated_txn_cost_per_object_in_commit(&self) -> Option { @@ -1866,10 +1853,7 @@ impl AuthorityPerEpochStore { cache_reader, ) .await?; - let mut db_batch = self.tables()?.assigned_shared_object_versions_v2.batch(); - self.set_assigned_shared_object_versions_with_db_batch(versions, &mut db_batch) - .await?; - db_batch.write()?; + self.set_assigned_shared_object_versions(versions); Ok(()) } @@ -1943,10 +1927,7 @@ impl AuthorityPerEpochStore { } pub fn deferred_transactions_empty(&self) -> bool { - self.tables() - .expect("deferred transactions should not be read past end of epoch") - .deferred_transactions - .is_empty() + self.deferred_transactions.lock().is_empty() } /// Check whether certificate was processed by consensus. @@ -1994,19 +1975,64 @@ impl AuthorityPerEpochStore { key: &SequencedConsensusTransactionKey, ) -> SuiResult { Ok(self - .tables()? - .consensus_message_processed - .contains_key(key)?) + .consensus_quarantine + .read() + .is_consensus_message_processed(key) + || self + .tables()? + .consensus_message_processed + .contains_key(key)?) } pub fn check_consensus_messages_processed( &self, keys: impl Iterator, ) -> SuiResult> { - Ok(self + // If the results were frequently true, it couild be better to check the in-memory store + // before reading from the DB, but most of the time this function is called on keys that + // have not been processed, so both reads are necessary anyway. + let keys = keys.collect::>(); + let mut results = self .tables()? .consensus_message_processed - .multi_contains_keys(keys)?) + .multi_contains_keys(&keys)?; + + for (key, result) in keys.iter().zip(&mut results) { + if !*result { + *result = self + .consensus_quarantine + .read() + .is_consensus_message_processed(key); + } + } + + Ok(results) + } + + /// Like consensus_message_processed_notify, but only checks the in-memory data. + /// This is correct because in-memory data contains only transactions that have + /// arrived after the last certified checkpoint, and checkpoints cannot contain + /// already-checkpointed transactions. + pub(crate) async fn consensus_messages_processed_notify_for_checkpoint( + &self, + keys: Vec, + ) -> Result<(), SuiError> { + let registrations = self.consensus_notify_read.register_all(&keys); + + let results = keys.iter().map(|key| { + self.consensus_quarantine + .read() + .is_consensus_message_processed(key) + }); + + let unprocessed_keys_registrations = registrations + .into_iter() + .zip(results) + .filter(|(_, processed)| !processed) + .map(|(registration, _)| registration); + + join_all(unprocessed_keys_registrations).await; + Ok(()) } pub async fn consensus_messages_processed_notify( @@ -2134,10 +2160,12 @@ impl AuthorityPerEpochStore { digests: &[TransactionDigest], ) -> SuiResult>> { assert_eq!(transactions.len(), digests.len()); - let signatures = self - .tables()? - .user_signatures_for_checkpoints - .multi_get(digests)?; + + let signatures: Vec<_> = { + let mut user_sigs = self.user_signatures_for_checkpoints.write(); + digests.iter().map(|d| user_sigs.remove(d)).collect() + }; + let mut result = Vec::with_capacity(digests.len()); for (signatures, transaction) in signatures.into_iter().zip(transactions.iter()) { let signatures = if let Some(signatures) = signatures { @@ -2330,29 +2358,7 @@ impl AuthorityPerEpochStore { } pub(crate) fn get_new_jwks(&self, round: u64) -> SuiResult> { - let epoch = self.epoch(); - - let empty_jwk_id = JwkId::new(String::new(), String::new()); - let empty_jwk = JWK { - kty: String::new(), - e: String::new(), - n: String::new(), - alg: String::new(), - }; - - let start = (round, (empty_jwk_id.clone(), empty_jwk.clone())); - let end = (round + 1, (empty_jwk_id, empty_jwk)); - - // TODO: use a safe iterator - Ok(self - .tables()? - .active_jwks - .safe_iter_with_bounds(Some(start), Some(end)) - .map_ok(|((r, (jwk_id, jwk)), _)| { - debug_assert!(round == r); - ActiveJwk { jwk_id, jwk, epoch } - }) - .collect::, _>>()?) + self.consensus_quarantine.read().get_new_jwks(self, round) } pub fn jwk_active_in_current_epoch(&self, jwk_id: &JwkId, jwk: &JWK) -> bool { @@ -2360,45 +2366,54 @@ impl AuthorityPerEpochStore { jwk_aggregator.has_quorum_for_key(&(jwk_id.clone(), jwk.clone())) } + pub(crate) fn get_randomness_last_round_timestamp(&self) -> SuiResult> { + if let Some(ts) = self + .consensus_quarantine + .read() + .get_randomness_last_round_timestamp() + { + Ok(Some(ts)) + } else { + Ok(self + .tables()? + .randomness_last_round_timestamp + .get(&SINGLETON_KEY)?) + } + } + pub fn test_insert_user_signature( &self, digest: TransactionDigest, signatures: Vec, ) { - self.tables() - .expect("test should not cross epoch boundary") - .user_signatures_for_checkpoints - .insert(&digest, &signatures) - .unwrap(); + self.user_signatures_for_checkpoints + .write() + .insert(digest, signatures); let key = ConsensusTransactionKey::Certificate(digest); let key = SequencedConsensusTransactionKey::External(key); - self.tables() - .expect("test should not cross epoch boundary") - .consensus_message_processed - .insert(&key, &true) - .unwrap(); + + let mut output = ConsensusCommitOutput::default(); + output.record_consensus_message_processed(key.clone()); + output.set_default_commit_stats_for_testing(); + self.consensus_quarantine + .write() + .push_consensus_output(output); self.consensus_notify_read.notify(&key, &()); } fn finish_consensus_certificate_process_with_batch( &self, - output: &mut ConsensusCommitOutput, certificates: &[VerifiedExecutableTransaction], ) -> SuiResult { - output.insert_pending_execution(certificates); - output.insert_user_signatures_for_checkpoints(certificates); - - if cfg!(debug_assertions) { - for certificate in certificates { - // User signatures are written in the same batch as consensus certificate processed flag, - // which means we won't attempt to insert this twice for the same tx digest - assert!(!self - .tables()? - .user_signatures_for_checkpoints - .contains_key(certificate.digest()) - .unwrap()); - } + let mut user_sigs = self.user_signatures_for_checkpoints.write(); + for certificate in certificates { + // User signatures are written in the same batch as consensus certificate processed flag, + // which means we won't attempt to insert this twice for the same tx digest + assert!(user_sigs + .insert(*certificate.digest(), certificate.tx_signatures().to_vec()) + .is_none()); } + Ok(()) } @@ -2631,6 +2646,7 @@ impl AuthorityPerEpochStore { consensus_stats: &ExecutionIndicesWithStats, checkpoint_service: &Arc, cache_reader: &dyn ObjectCacheRead, + tx_reader: &dyn TransactionCacheRead, consensus_commit_info: &ConsensusCommitInfo, authority_metrics: &Arc, ) -> SuiResult> { @@ -2830,9 +2846,11 @@ impl AuthorityPerEpochStore { authority_metrics, ) .await?; - self.finish_consensus_certificate_process_with_batch(&mut output, &verified_transactions)?; + self.finish_consensus_certificate_process_with_batch(&verified_transactions)?; output.record_consensus_commit_stats(consensus_stats.clone()); + let mut verified_transactions = verified_transactions; + // Create pending checkpoints if we are still accepting tx. let should_accept_tx = if let Some(lock) = &lock { lock.should_accept_tx() @@ -2883,10 +2901,21 @@ impl AuthorityPerEpochStore { // - Exception: if DKG fails, we always need to write out a PendingCheckpoint // for randomness tx that are canceled. if let Some(randomness_round) = randomness_round { - randomness_roots.insert(TransactionKey::RandomnessRound( - self.epoch(), - randomness_round, - )); + let key = TransactionKey::RandomnessRound(self.epoch(), randomness_round); + + // During crash recovery, the randomness update transaction may already have been + // created and executed before the crash. If it is available locally, we need to + // ensure it is executed. + if let Some(digest) = self.tables()?.transaction_key_to_digest.get(&key)? { + if let Some(tx) = tx_reader.get_transaction_block(&digest)? { + info!("Randomness update transaction {:?} already exists, scheduling for execution", digest); + let tx = + VerifiedExecutableTransaction::new_system((*tx).clone(), self.epoch()); + verified_transactions.push(tx); + } + } + + randomness_roots.insert(key); } if randomness_round.is_some() || (dkg_failed && !randomness_roots.is_empty()) { let pending_checkpoint = PendingCheckpointV2::V2(PendingCheckpointV2Contents { @@ -2901,9 +2930,14 @@ impl AuthorityPerEpochStore { } } - let mut batch = self.db_batch()?; - output.write_to_batch(self, &mut batch)?; - batch.write()?; + { + let mut consensus_quarantine = self.consensus_quarantine.write(); + consensus_quarantine.push_consensus_output(output); + + // we may already have observed the certified checkpoint for this round, if state sync is running + // ahead of consensus, so there may be data to commit right away. + consensus_quarantine.commit(self)?; + } // Only after batch is written, notify checkpoint service to start building any new // pending checkpoints. @@ -3029,19 +3063,33 @@ impl AuthorityPerEpochStore { cancelled_txns, ) .await?; - output.set_assigned_shared_object_versions(assigned_versions, shared_input_next_versions); + + for (tx_key, objects) in &assigned_versions { + debug!( + ?tx_key, + ?objects, + "storing shared object versions in memory" + ); + self.shared_version_assignments + .insert(*tx_key, objects.clone()); + } + + output.set_next_shared_object_versions(shared_input_next_versions); Ok(()) } + #[cfg(test)] + pub(crate) fn push_consensus_output_for_tests(&self, output: ConsensusCommitOutput) { + self.consensus_quarantine + .write() + .push_consensus_output(output); + } + #[cfg(any(test, feature = "test-utils"))] pub fn get_highest_pending_checkpoint_height(&self) -> CheckpointHeight { - self.tables() - .expect("test should not cross epoch boundary") - .pending_checkpoints_v2 - .unbounded_iter() - .skip_to_last() - .next() - .map(|(key, _)| key) + self.consensus_quarantine + .read() + .get_highest_pending_checkpoint_height() .unwrap_or_default() } @@ -3055,6 +3103,7 @@ impl AuthorityPerEpochStore { transactions: Vec, checkpoint_service: &Arc, cache_reader: &dyn ObjectCacheRead, + tx_reader: &dyn TransactionCacheRead, authority_metrics: &Arc, skip_consensus_commit_prologue_in_test: bool, ) -> SuiResult> { @@ -3063,6 +3112,7 @@ impl AuthorityPerEpochStore { &ExecutionIndicesWithStats::default(), checkpoint_service, cache_reader, + tx_reader, &ConsensusCommitInfo::new_for_test( if self.randomness_state_enabled() { self.get_highest_pending_checkpoint_height() / 2 + 1 @@ -3093,6 +3143,7 @@ impl AuthorityPerEpochStore { ) .await?; let mut batch = self.db_batch()?; + output.set_default_commit_stats_for_testing(); output.write_to_batch(self, &mut batch)?; batch.write()?; Ok(()) @@ -3259,10 +3310,15 @@ impl AuthorityPerEpochStore { let commit_has_deferred_txns = !deferred_txns.is_empty(); let mut total_deferred_txns = 0; - for (key, txns) in deferred_txns.into_iter() { - total_deferred_txns += txns.len(); - output.defer_transactions(key, txns); + { + let mut deferred_transactions = self.deferred_transactions.lock(); + for (key, txns) in deferred_txns.into_iter() { + total_deferred_txns += txns.len(); + deferred_transactions.insert(key, txns.clone()); + output.defer_transactions(key, txns); + } } + authority_metrics .consensus_handler_deferred_transactions .inc_by(total_deferred_txns as u64); @@ -3755,7 +3811,7 @@ impl AuthorityPerEpochStore { checkpoint: &PendingCheckpointV2, ) -> SuiResult { assert!( - self.get_pending_checkpoint(&checkpoint.height())?.is_none(), + !self.pending_checkpoint_exists(&checkpoint.height())?, "Duplicate pending checkpoint notification at height {:?}", checkpoint.height() ); @@ -3780,31 +3836,25 @@ impl AuthorityPerEpochStore { &self, last: Option, ) -> SuiResult> { - let tables = self.tables()?; - let mut iter = tables.pending_checkpoints_v2.unbounded_iter(); - if let Some(last_processed_height) = last { - iter = iter.skip_to(&(last_processed_height + 1))?; - } - Ok(iter.collect()) + Ok(self + .consensus_quarantine + .read() + .get_pending_checkpoints(last)) } - pub fn get_pending_checkpoint( - &self, - index: &CheckpointHeight, - ) -> SuiResult> { - Ok(self.tables()?.pending_checkpoints_v2.get(index)?) + pub fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> SuiResult { + Ok(self + .consensus_quarantine + .read() + .pending_checkpoint_exists(index)) } pub fn process_pending_checkpoint( &self, commit_height: CheckpointHeight, content_info: Vec<(CheckpointSummary, CheckpointContents)>, - ) -> SuiResult<()> { - let tables = self.tables()?; - // All created checkpoints are inserted in builder_checkpoint_summary in a single batch. - // This means that upon restart we can use BuilderCheckpointSummary::commit_height - // from the last built summary to resume building checkpoints. - let mut batch = tables.pending_checkpoints_v2.batch(); + ) { + let mut consensus_quarantine = self.consensus_quarantine.write(); for (position_in_commit, (summary, transactions)) in content_info.into_iter().enumerate() { let sequence_number = summary.sequence_number; let summary = BuilderCheckpointSummary { @@ -3812,34 +3862,14 @@ impl AuthorityPerEpochStore { checkpoint_height: Some(commit_height), position_in_commit, }; - batch.insert_batch( - &tables.builder_checkpoint_summary_v2, - [(&sequence_number, summary)], - )?; - batch.insert_batch( - &tables.builder_digest_to_checkpoint, - transactions - .iter() - .map(|tx| (tx.transaction, sequence_number)), - )?; - batch.delete_batch( - &tables.user_signatures_for_checkpoints, - transactions.iter().map(|tx| tx.transaction), - )?; + consensus_quarantine.insert_builder_summary(sequence_number, summary, transactions); } - // find all pending checkpoints <= commit_height and remove them - let iter = tables - .pending_checkpoints_v2 - .safe_range_iter(0..=commit_height); - let keys = iter - .map(|c| c.map(|(h, _)| h)) - .collect::, _>>()?; - - batch.delete_batch(&tables.pending_checkpoints_v2, &keys)?; - - Ok(batch.write()?) + // Because builder can run behind state sync, the data may be immediately ready to be committed. + consensus_quarantine + .commit(self) + .expect("commit cannot fail"); } /// Register genesis checkpoint in builder DB @@ -3873,6 +3903,10 @@ impl AuthorityPerEpochStore { pub fn last_built_checkpoint_builder_summary( &self, ) -> SuiResult> { + if let Some(summary) = self.consensus_quarantine.read().last_built_summary() { + return Ok(Some(summary.clone())); + } + Ok(self .tables()? .builder_checkpoint_summary_v2 @@ -3885,19 +3919,41 @@ impl AuthorityPerEpochStore { pub fn last_built_checkpoint_summary( &self, ) -> SuiResult> { - Ok(self - .tables()? - .builder_checkpoint_summary_v2 - .unbounded_iter() - .skip_to_last() - .next() - .map(|(seq, s)| (seq, s.summary))) + if let Some(BuilderCheckpointSummary { summary, .. }) = + self.consensus_quarantine.read().last_built_summary() + { + let seq = *summary.sequence_number(); + debug!( + "returning last_built_summary from consensus quarantine: {:?}", + seq + ); + Ok(Some((seq, summary.clone()))) + } else { + let seq = self + .tables()? + .builder_checkpoint_summary_v2 + .unbounded_iter() + .skip_to_last() + .next() + .map(|(seq, s)| (seq, s.summary)); + debug!( + "returning last_built_summary from builder_checkpoint_summary_v2: {:?}", + seq + ); + Ok(seq) + } } pub fn get_built_checkpoint_summary( &self, sequence: CheckpointSequenceNumber, ) -> SuiResult> { + if let Some(BuilderCheckpointSummary { summary, .. }) = + self.consensus_quarantine.read().get_built_summary(sequence) + { + return Ok(Some(summary.clone())); + } + Ok(self .tables()? .builder_checkpoint_summary_v2 @@ -3909,10 +3965,40 @@ impl AuthorityPerEpochStore { &self, digests: impl Iterator, ) -> SuiResult> { - Ok(self + let size_hint = digests.size_hint().0; + let mut results = Vec::with_capacity(size_hint); + let mut fallback_keys = Vec::with_capacity(size_hint); + let mut fallback_indices = Vec::with_capacity(size_hint); + + { + let consensus_quarantine = self.consensus_quarantine.read(); + + for (i, digest) in digests.enumerate() { + if consensus_quarantine.included_transaction_in_checkpoint(digest) { + results.push(true); + } else { + results.push(false); + fallback_keys.push(digest); + fallback_indices.push(i); + } + } + } + + let fallback_results = self .tables()? .builder_digest_to_checkpoint - .multi_contains_keys(digests)?) + .multi_contains_keys(fallback_keys)?; + + assert_eq!(fallback_results.len(), fallback_indices.len()); + + for (result, i) in fallback_results + .into_iter() + .zip(fallback_indices.into_iter()) + { + results[i] = result; + } + + Ok(results) } pub fn get_last_checkpoint_signature_index(&self) -> SuiResult { @@ -4059,24 +4145,416 @@ impl AuthorityPerEpochStore { } } +mod quarantine { + use super::*; + + /// ConsensusOutputQuarantine holds outputs of consensus processing in memory until the checkpoints + /// for the commit have been certified. + pub(crate) struct ConsensusOutputQuarantine { + // Output from consensus handler + output_queue: VecDeque, + + // Highest known certified checkpoint sequence number + highest_executed_checkpoint: CheckpointSequenceNumber, + + // Checkpoint Builder output + builder_checkpoint_summary: + BTreeMap, + + builder_digest_to_checkpoint: HashMap, + + // Any un-committed next versions are stored here. A ref-count is used to + // track which objects still exist in some element of output_queue. + shared_object_next_versions: HashMap, + } + + impl ConsensusOutputQuarantine { + pub(super) fn new(highest_executed_checkpoint: CheckpointSequenceNumber) -> Self { + Self { + highest_executed_checkpoint, + + output_queue: VecDeque::new(), + builder_checkpoint_summary: BTreeMap::new(), + builder_digest_to_checkpoint: HashMap::new(), + //next_checkpoint_sequence_to_commit: None, + shared_object_next_versions: HashMap::new(), + } + } + } + + // Write methods - all methods in this block insert new data into the quarantine. + // There are only two sources! ConsensusHandler and CheckpointBuilder. + impl ConsensusOutputQuarantine { + // Push all data gathered from a consensus commit into the quarantine. + pub(super) fn push_consensus_output(&mut self, output: ConsensusCommitOutput) { + self.insert_shared_object_next_versions(&output); + self.output_queue.push_back(output); + } + + // Record a newly built checkpoint. + pub(super) fn insert_builder_summary( + &mut self, + sequence_number: CheckpointSequenceNumber, + summary: BuilderCheckpointSummary, + contents: CheckpointContents, + ) { + debug!(?sequence_number, "inserting builder summary {:?}", summary); + for tx in contents.iter() { + debug!(?sequence_number, "inserting tx {:?}", tx.transaction); + self.builder_digest_to_checkpoint + .insert(tx.transaction, sequence_number); + } + self.builder_checkpoint_summary + .insert(sequence_number, (summary, contents)); + } + } + + // Commit methods. + impl ConsensusOutputQuarantine { + /// Update thje highest executed checkpoint and commit any data which is now + /// below the watermark. + pub(super) fn update_highest_executed_checkpoint( + &mut self, + checkpoint: CheckpointSequenceNumber, + epoch_store: &AuthorityPerEpochStore, + batch: &mut DBBatch, + ) -> SuiResult { + self.highest_executed_checkpoint = checkpoint; + self.commit_with_batch(epoch_store, batch) + } + + pub(super) fn commit(&mut self, epoch_store: &AuthorityPerEpochStore) -> SuiResult { + let mut batch = epoch_store.db_batch()?; + self.commit_with_batch(epoch_store, &mut batch)?; + batch.write()?; + Ok(()) + } + + /// Commit all data below the watermark. + fn commit_with_batch( + &mut self, + epoch_store: &AuthorityPerEpochStore, + batch: &mut DBBatch, + ) -> SuiResult { + // The commit algorithm is simple: + // 1. First commit all checkpoint builder state which is below the watermark. + // 2. Determine the consensus commit height that corresponds to the highest committed + // checkpoint. + // 3. Commit all consensus output at that height or below. + + let tables = epoch_store.tables()?; + + let mut highest_committed_height = None; + + while self + .builder_checkpoint_summary + .first_key_value() + .map(|(seq, _)| *seq <= self.highest_executed_checkpoint) + == Some(true) + { + let (seq, (builder_summary, contents)) = + self.builder_checkpoint_summary.pop_first().unwrap(); + + for tx in contents.iter() { + let digest = &tx.transaction; + debug!("removing tx {:?}", tx.transaction); + assert_eq!( + self.builder_digest_to_checkpoint + .remove(digest) + .unwrap_or_else(|| { + panic!( + "transaction {:?} not found in builder_digest_to_checkpoint", + digest + ) + }), + seq + ); + } + + batch.insert_batch( + &tables.builder_digest_to_checkpoint, + contents.iter().map(|tx| (tx.transaction, seq)), + )?; + + batch.insert_batch( + &tables.builder_checkpoint_summary_v2, + [(seq, &builder_summary)], + )?; + + let checkpoint_height = builder_summary + .checkpoint_height + .expect("non-genesis checkpoint must have height"); + if let Some(highest) = highest_committed_height { + assert!(checkpoint_height > highest); + } + + highest_committed_height = Some(checkpoint_height); + } + + let Some(highest_committed_height) = highest_committed_height else { + return Ok(()); + }; + + while !self.output_queue.is_empty() { + // A consensus commit can have more than one pending checkpoint (a regular one and a randomnes one). + // We can only write the consensus commit if the highest pending checkpoint associated with it has + // been processed by the builder. + let Some(highest_in_commit) = self + .output_queue + .front() + .unwrap() + .get_highest_pending_checkpoint_height() + else { + // if highest is none, we have already written the pending checkpoint for the final epoch, + // so there is no more data that needs to be committed. + break; + }; + + if highest_in_commit <= highest_committed_height { + info!( + "committing output with highest pending checkpoint height {:?}", + highest_in_commit + ); + let output = self.output_queue.pop_front().unwrap(); + self.remove_shared_object_next_versions(&output); + output.write_to_batch(epoch_store, batch)?; + } else { + break; + } + } + + Ok(()) + } + } + + // Read methods - all methods in this block return data from the quarantine which would otherwise + // by found in the database. + impl ConsensusOutputQuarantine { + pub(super) fn last_built_summary(&self) -> Option<&BuilderCheckpointSummary> { + self.builder_checkpoint_summary + .values() + .last() + .map(|(summary, _)| summary) + } + + pub(super) fn get_built_summary( + &self, + sequence: CheckpointSequenceNumber, + ) -> Option<&BuilderCheckpointSummary> { + self.builder_checkpoint_summary + .get(&sequence) + .map(|(summary, _)| summary) + } + + pub(super) fn included_transaction_in_checkpoint( + &self, + digest: &TransactionDigest, + ) -> bool { + self.builder_digest_to_checkpoint.contains_key(digest) + } + + pub(super) fn is_consensus_message_processed( + &self, + key: &SequencedConsensusTransactionKey, + ) -> bool { + self.output_queue + .iter() + .any(|output| output.consensus_messages_processed.contains(key)) + } + + pub(super) fn has_last_consensus_stats(&self) -> bool { + self.output_queue + .iter() + .any(|output| output.has_last_consensus_stats()) + } + + pub(super) fn get_next_shared_object_versions( + &self, + tables: &AuthorityEpochTables, + object_ids: &[ObjectID], + ) -> SuiResult>> { + let mut results = Vec::with_capacity(object_ids.len()); + let mut fallback_keys = Vec::with_capacity(object_ids.len()); + let mut fallback_indices = Vec::with_capacity(object_ids.len()); + + for (i, object_id) in object_ids.iter().enumerate() { + if let Some((_, next_version)) = self.shared_object_next_versions.get(object_id) { + results.push(Some(*next_version)); + } else { + results.push(None); + fallback_keys.push(object_id); + fallback_indices.push(i); + } + } + + let fallback_results = tables + .next_shared_object_versions + .multi_get(fallback_keys)?; + assert_eq!(fallback_results.len(), fallback_indices.len()); + for (i, result) in fallback_indices.into_iter().zip(fallback_results) { + results[i] = result; + } + + Ok(results) + } + + pub(super) fn insert_shared_object_next_versions( + &mut self, + output: &ConsensusCommitOutput, + ) { + if let Some(next_versions) = output.next_shared_object_versions.as_ref() { + for (object_id, next_version) in next_versions { + let entry = self.shared_object_next_versions.entry(*object_id); + match entry { + hash_map::Entry::Occupied(mut entry) => { + let (ref_count, v) = entry.get_mut(); + *ref_count += 1; + *v = *next_version; + } + hash_map::Entry::Vacant(entry) => { + entry.insert((1, *next_version)); + } + } + } + } + } + + pub(super) fn remove_shared_object_next_versions( + &mut self, + output: &ConsensusCommitOutput, + ) { + if let Some(next_versions) = output.next_shared_object_versions.as_ref() { + for object_id in next_versions.keys() { + let entry = self.shared_object_next_versions.entry(*object_id); + match entry { + hash_map::Entry::Occupied(mut entry) => { + let (ref_count, _) = entry.get_mut(); + *ref_count -= 1; + if *ref_count == 0 { + entry.remove(); + } + } + hash_map::Entry::Vacant(_) => { + panic!("Shared object next version not found"); + } + } + } + } + } + + pub(super) fn get_highest_pending_checkpoint_height(&self) -> Option { + self.output_queue + .back() + .and_then(|output| output.get_highest_pending_checkpoint_height()) + } + + pub(super) fn get_pending_checkpoints( + &self, + last: Option, + ) -> Vec<(CheckpointHeight, PendingCheckpointV2)> { + let mut checkpoints = Vec::new(); + for output in &self.output_queue { + checkpoints.extend( + output + .get_pending_checkpoints(last) + .map(|cp| (cp.height(), cp.clone())), + ); + } + if cfg!(debug_assertions) { + let mut prev = None; + for (height, _) in &checkpoints { + if let Some(prev) = prev { + assert!(prev < *height); + } + prev = Some(*height); + } + } + checkpoints + } + + pub(super) fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool { + self.output_queue + .iter() + .any(|output| output.pending_checkpoint_exists(index)) + } + + pub(super) fn get_new_jwks( + &self, + epoch_store: &AuthorityPerEpochStore, + round: u64, + ) -> SuiResult> { + let epoch = epoch_store.epoch(); + + // Check if the requested round is in memory + for output in self.output_queue.iter().rev() { + // unwrap safe because output will always have last consensus stats set before being added + // to the quarantine + let output_round = output.get_round().unwrap(); + if round == output_round { + return Ok(output + .active_jwks + .iter() + .map(|(_, (jwk_id, jwk))| ActiveJwk { + jwk_id: jwk_id.clone(), + jwk: jwk.clone(), + epoch, + }) + .collect()); + } + } + + // Fall back to reading from database + let empty_jwk_id = JwkId::new(String::new(), String::new()); + let empty_jwk = JWK { + kty: String::new(), + e: String::new(), + n: String::new(), + alg: String::new(), + }; + + let start = (round, (empty_jwk_id.clone(), empty_jwk.clone())); + let end = (round + 1, (empty_jwk_id, empty_jwk)); + + Ok(epoch_store + .tables()? + .active_jwks + .safe_iter_with_bounds(Some(start), Some(end)) + .map_ok(|((r, (jwk_id, jwk)), _)| { + debug_assert!(round == r); + ActiveJwk { jwk_id, jwk, epoch } + }) + .collect::, _>>()?) + } + + pub(super) fn get_randomness_last_round_timestamp(&self) -> Option { + self.output_queue + .iter() + .rev() + .filter_map(|output| output.get_randomness_last_round_timestamp()) + .next() + } + } +} + +use quarantine::ConsensusOutputQuarantine; + #[derive(Default)] pub(crate) struct ConsensusCommitOutput { // Consensus and reconfig state - consensus_messages_processed: BTreeSet, - end_of_publish: BTreeSet, - reconfig_state: Option, - consensus_commit_stats: Option, - pending_execution: Vec, + consensus_messages_processed: BTreeSet, // done + end_of_publish: BTreeSet, // done + reconfig_state: Option, // done + consensus_commit_stats: Option, // done // transaction scheduling state - shared_object_versions: Option<(AssignedTxAndVersions, HashMap)>, + next_shared_object_versions: Option>, deferred_txns: Vec<(DeferralKey, Vec)>, // deferred txns that have been loaded and can be removed deleted_deferred_txns: BTreeSet, // checkpoint state - user_signatures_for_checkpoints: Vec<(TransactionDigest, Vec)>, + //user_signatures_for_checkpoints: Vec<(TransactionDigest, Vec)>, pending_checkpoints: Vec, // random beacon state @@ -4093,34 +4571,64 @@ pub(crate) struct ConsensusCommitOutput { } impl ConsensusCommitOutput { - pub fn new() -> Self { - Default::default() + fn get_randomness_last_round_timestamp(&self) -> Option { + self.next_randomness_round.as_ref().map(|(_, ts)| *ts) } - fn insert_end_of_publish(&mut self, authority: AuthorityName) { - self.end_of_publish.insert(authority); + fn get_highest_pending_checkpoint_height(&self) -> Option { + // TODO: can we simply get the height of the last checkpoint in the list? + self.pending_checkpoints.iter().map(|cp| cp.height()).max() + } + + fn get_pending_checkpoints( + &self, + last: Option, + ) -> impl Iterator { + self.pending_checkpoints.iter().filter(move |cp| { + if let Some(last) = last { + cp.height() > last + } else { + true + } + }) } - fn insert_pending_execution(&mut self, transactions: &[VerifiedExecutableTransaction]) { - self.pending_execution.reserve(transactions.len()); - self.pending_execution.extend_from_slice(transactions); + fn pending_checkpoint_exists(&self, index: &CheckpointHeight) -> bool { + self.pending_checkpoints + .iter() + .any(|cp| cp.height() == *index) } - fn insert_user_signatures_for_checkpoints( - &mut self, - transactions: &[VerifiedExecutableTransaction], - ) { - self.user_signatures_for_checkpoints.extend( - transactions - .iter() - .map(|tx| (*tx.digest(), tx.tx_signatures().to_vec())), - ); + fn has_last_consensus_stats(&self) -> bool { + self.consensus_commit_stats.is_some() + } +} + +impl ConsensusCommitOutput { + pub fn new() -> Self { + Default::default() + } + + fn get_round(&self) -> Option { + self.consensus_commit_stats + .as_ref() + .map(|stats| stats.index.last_committed_round) + } + + fn insert_end_of_publish(&mut self, authority: AuthorityName) { + self.end_of_publish.insert(authority); } fn record_consensus_commit_stats(&mut self, stats: ExecutionIndicesWithStats) { self.consensus_commit_stats = Some(stats); } + // in testing code we often need to write to the db outside of a consensus commit + #[cfg(any(test, feature = "test-utils"))] + pub(crate) fn set_default_commit_stats_for_testing(&mut self) { + self.record_consensus_commit_stats(Default::default()); + } + fn store_reconfig_state(&mut self, state: ReconfigState) { self.reconfig_state = Some(state); } @@ -4129,13 +4637,12 @@ impl ConsensusCommitOutput { self.consensus_messages_processed.insert(key); } - fn set_assigned_shared_object_versions( + fn set_next_shared_object_versions( &mut self, - versions: AssignedTxAndVersions, next_versions: HashMap, ) { - assert!(self.shared_object_versions.is_none()); - self.shared_object_versions = Some((versions, next_versions)); + assert!(self.next_shared_object_versions.is_none()); + self.next_shared_object_versions = Some(next_versions); } fn defer_transactions( @@ -4214,44 +4721,23 @@ impl ConsensusCommitOutput { )?; } - if let Some(consensus_commit_stats) = &self.consensus_commit_stats { - batch.insert_batch( - &tables.last_consensus_stats, - [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)], - )?; - } + let consensus_commit_stats = self + .consensus_commit_stats + .expect("consensus_commit_stats must be set"); + let round = consensus_commit_stats.index.last_committed_round; batch.insert_batch( - &tables.pending_execution, - self.pending_execution - .into_iter() - .map(|tx| (*tx.inner().digest(), tx.serializable())), + &tables.last_consensus_stats, + [(LAST_CONSENSUS_STATS_ADDR, consensus_commit_stats)], )?; - if let Some((assigned_versions, next_versions)) = self.shared_object_versions { - batch.insert_batch( - &tables.assigned_shared_object_versions_v2, - assigned_versions, - )?; - + if let Some(next_versions) = self.next_shared_object_versions { batch.insert_batch(&tables.next_shared_object_versions, next_versions)?; } batch.delete_batch(&tables.deferred_transactions, self.deleted_deferred_txns)?; batch.insert_batch(&tables.deferred_transactions, self.deferred_txns)?; - batch.insert_batch( - &tables.user_signatures_for_checkpoints, - self.user_signatures_for_checkpoints, - )?; - - batch.insert_batch( - &tables.pending_checkpoints_v2, - self.pending_checkpoints - .into_iter() - .map(|cp| (cp.height(), cp)), - )?; - if let Some((round, commit_timestamp)) = self.next_randomness_round { batch.insert_batch(&tables.randomness_next_round, [(SINGLETON_KEY, round)])?; batch.insert_batch( @@ -4282,7 +4768,11 @@ impl ConsensusCommitOutput { )?; batch.insert_batch( &tables.active_jwks, - self.active_jwks.into_iter().map(|j| (j, ())), + self.active_jwks.into_iter().map(|j| { + // TODO: we don't need to store the round in this map if it is invariant + assert_eq!(j.0, round); + (j, ()) + }), )?; Ok(()) @@ -4290,15 +4780,10 @@ impl ConsensusCommitOutput { } impl GetSharedLocks for AuthorityPerEpochStore { - fn get_shared_locks( - &self, - key: &TransactionKey, - ) -> Result, SuiError> { - Ok(self - .tables()? - .assigned_shared_object_versions_v2 - .get(key)? - .unwrap_or_default()) + fn get_shared_locks(&self, key: &TransactionKey) -> Option> { + self.shared_version_assignments + .get(key) + .map(|locks| locks.clone()) } } diff --git a/crates/sui-core/src/authority/authority_store.rs b/crates/sui-core/src/authority/authority_store.rs index cb4dc555d975c..d81df6ae5321e 100644 --- a/crates/sui-core/src/authority/authority_store.rs +++ b/crates/sui-core/src/authority/authority_store.rs @@ -972,18 +972,13 @@ impl AuthorityStore { Ok(()) } - /// Commits transactions only to the db. Called by checkpoint builder. See - /// ExecutionCache::commit_transactions for more info - pub(crate) fn commit_transactions( - &self, - transactions: &[(TransactionDigest, VerifiedTransaction)], - ) -> SuiResult { + /// Commits transactions only (not effects or other transaction outputs) to the db. + /// See ExecutionCache::persist_transaction for more info + pub(crate) fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult { let mut batch = self.perpetual_tables.transactions.batch(); batch.insert_batch( &self.perpetual_tables.transactions, - transactions - .iter() - .map(|(digest, tx)| (*digest, tx.serializable_ref())), + [(tx.digest(), tx.clone().into_unsigned().serializable_ref())], )?; batch.write()?; Ok(()) diff --git a/crates/sui-core/src/authority/authority_test_utils.rs b/crates/sui-core/src/authority/authority_test_utils.rs index a55c820334da5..8fde4e9cf0164 100644 --- a/crates/sui-core/src/authority/authority_test_utils.rs +++ b/crates/sui-core/src/authority/authority_test_utils.rs @@ -408,6 +408,7 @@ pub async fn send_consensus(authority: &AuthorityState, cert: &VerifiedCertifica vec![transaction], &Arc::new(CheckpointServiceNoop {}), authority.get_object_cache_reader().as_ref(), + authority.get_transaction_cache_reader().as_ref(), &authority.metrics, true, ) @@ -432,6 +433,7 @@ pub async fn send_consensus_no_execution(authority: &AuthorityState, cert: &Veri vec![transaction], &Arc::new(CheckpointServiceNoop {}), authority.get_object_cache_reader().as_ref(), + authority.get_transaction_cache_reader().as_ref(), &authority.metrics, true, ) @@ -462,6 +464,7 @@ pub async fn send_batch_consensus_no_execution( transactions, &Arc::new(CheckpointServiceNoop {}), authority.get_object_cache_reader().as_ref(), + authority.get_transaction_cache_reader().as_ref(), &authority.metrics, skip_consensus_commit_prologue_in_test, ) diff --git a/crates/sui-core/src/authority/test_authority_builder.rs b/crates/sui-core/src/authority/test_authority_builder.rs index f0a0df27bf354..f8edf8c0e349e 100644 --- a/crates/sui-core/src/authority/test_authority_builder.rs +++ b/crates/sui-core/src/authority/test_authority_builder.rs @@ -232,6 +232,8 @@ impl<'a> TestAuthorityBuilder<'a> { let cache_traits = build_execution_cache(&epoch_start_configuration, ®istry, &authority_store); + let checkpoint_store = CheckpointStore::new(&path.join("checkpoints")); + let epoch_store = AuthorityPerEpochStore::new( name, Arc::new(genesis_committee.clone()), @@ -245,6 +247,10 @@ impl<'a> TestAuthorityBuilder<'a> { signature_verifier_metrics, &expensive_safety_checks, ChainIdentifier::from(*genesis.checkpoint().digest()), + checkpoint_store + .get_highest_executed_checkpoint_seq_number() + .unwrap() + .unwrap_or(0), ); let committee_store = Arc::new(CommitteeStore::new( path.join("epochs"), @@ -252,7 +258,6 @@ impl<'a> TestAuthorityBuilder<'a> { None, )); - let checkpoint_store = CheckpointStore::new(&path.join("checkpoints")); if self.insert_genesis_checkpoint { checkpoint_store.insert_genesis_checkpoint( genesis.checkpoint(), diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs index f04ea86367464..c21e7798c577d 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs @@ -28,6 +28,7 @@ use std::{ use either::Either; use futures::stream::FuturesOrdered; use itertools::izip; +use mysten_common::fatal; use mysten_metrics::spawn_monitored_task; use sui_config::node::{CheckpointExecutorConfig, RunWithRange}; use sui_macros::{fail_point, fail_point_async}; @@ -441,7 +442,7 @@ impl CheckpointExecutor { .expect("commit_transaction_outputs cannot fail"); epoch_store - .handle_committed_transactions(all_tx_digests) + .handle_finalized_checkpoint(checkpoint.data(), all_tx_digests) .expect("cannot fail"); if !checkpoint.is_last_checkpoint_of_epoch() { @@ -1163,7 +1164,7 @@ fn get_unexecuted_transactions( .enumerate() .map(|(i, (tx, expected_effects_digest))| { let tx = tx.unwrap_or_else(|| - panic!( + fatal!( "state-sync should have ensured that transaction with digest {:?} exists for checkpoint: {checkpoint:?}", unexecuted_txns[i] ) diff --git a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs b/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs index 544d08d11e778..f82e2084bb5dd 100644 --- a/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs +++ b/crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs @@ -203,13 +203,11 @@ pub async fn test_checkpoint_executor_cross_epoch() { .unwrap(); // We should have synced up to epoch boundary - assert_eq!( - checkpoint_store - .get_highest_executed_checkpoint_seq_number() - .unwrap() - .unwrap(), - num_to_sync_per_epoch as u64, - ); + let highest_executed_ckpt = checkpoint_store + .get_highest_executed_checkpoint_seq_number() + .unwrap() + .unwrap(); + assert_eq!(highest_executed_ckpt, num_to_sync_per_epoch as u64,); let first_epoch = 0; @@ -236,6 +234,7 @@ pub async fn test_checkpoint_executor_cross_epoch() { .unwrap(), accumulator, &ExpensiveSafetyCheckConfig::default(), + highest_executed_ckpt, ) .await .unwrap(); diff --git a/crates/sui-core/src/checkpoints/mod.rs b/crates/sui-core/src/checkpoints/mod.rs index d9aed27ff48f3..70a864613cc00 100644 --- a/crates/sui-core/src/checkpoints/mod.rs +++ b/crates/sui-core/src/checkpoints/mod.rs @@ -21,13 +21,13 @@ use diffy::create_patch; use futures::future::{select, Either}; use futures::FutureExt; use itertools::Itertools; +use mysten_common::fatal; use mysten_metrics::{monitored_scope, spawn_monitored_task, MonitoredFutureExt}; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use sui_macros::fail_point; use sui_network::default_mysten_network_config; use sui_types::base_types::ConciseableName; -use sui_types::executable_transaction::VerifiedExecutableTransaction; use sui_types::messages_checkpoint::CheckpointCommitment; use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait; @@ -471,7 +471,7 @@ impl CheckpointStore { ?local_contents, "Local checkpoint fork detected!", ); - panic!( + fatal!( "Local checkpoint fork detected for sequence number: {}", local_checkpoint.sequence_number() ); @@ -740,109 +740,6 @@ impl CheckpointStore { self.watermarks.rocksdb.flush()?; Ok(()) } - - /// Re-executes all transactions from all local, uncertified checkpoints for crash recovery. - /// All transactions thus re-executed are guaranteed to not have any missing dependencies, - /// because we start from the highest executed checkpoint, and proceed through checkpoints in - /// order. - #[instrument(level = "debug", skip_all)] - pub async fn reexecute_local_checkpoints( - &self, - state: &AuthorityState, - epoch_store: &AuthorityPerEpochStore, - ) { - info!("rexecuting locally computed checkpoints for crash recovery"); - let epoch = epoch_store.epoch(); - let highest_executed = self - .get_highest_executed_checkpoint_seq_number() - .expect("get_highest_executed_checkpoint_seq_number should not fail") - .unwrap_or(0); - - let Some(highest_built) = self.get_latest_locally_computed_checkpoint() else { - info!("no locally built checkpoints to verify"); - return; - }; - - for seq in highest_executed + 1..=*highest_built.sequence_number() { - info!(?seq, "Re-executing locally computed checkpoint"); - let Some(checkpoint) = self - .get_locally_computed_checkpoint(seq) - .expect("get_locally_computed_checkpoint should not fail") - else { - panic!("locally computed checkpoint {:?} not found", seq); - }; - - let Some(contents) = self - .get_checkpoint_contents(&checkpoint.content_digest) - .expect("get_checkpoint_contents should not fail") - else { - panic!("checkpoint contents not found for locally computed checkpoint {:?} (digest: {:?})", seq, checkpoint.content_digest); - }; - - let cache = state.get_transaction_cache_reader(); - - let tx_digests: Vec<_> = contents.iter().map(|digests| digests.transaction).collect(); - let fx_digests: Vec<_> = contents.iter().map(|digests| digests.effects).collect(); - let txns = cache - .multi_get_transaction_blocks(&tx_digests) - .expect("multi_get_transaction_blocks should not fail"); - for (tx, digest) in txns.iter().zip(tx_digests.iter()) { - if tx.is_none() { - panic!("transaction {:?} not found", digest); - } - } - - let txns: Vec<_> = txns - .into_iter() - .map(|tx| tx.unwrap()) - .zip(fx_digests.into_iter()) - // end of epoch transaction can only be executed by CheckpointExecutor - .filter(|(tx, _)| !tx.data().transaction_data().is_end_of_epoch_tx()) - .map(|(tx, fx)| { - ( - VerifiedExecutableTransaction::new_from_checkpoint( - (*tx).clone(), - epoch, - seq, - ), - fx, - ) - }) - .collect(); - - let tx_digests: Vec<_> = txns.iter().map(|(tx, _)| *tx.digest()).collect(); - - info!( - ?seq, - ?tx_digests, - "Re-executing transactions for locally built checkpoint" - ); - // this will panic if any re-execution diverges from the previously recorded effects digest - state.enqueue_with_expected_effects_digest(txns, epoch_store); - - // a task that logs every so often until it is cancelled - // This should normally finish very quickly, so seeing this log more than once or twice is - // likely a sign of a problem. - let waiting_logger = tokio::task::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - loop { - interval.tick().await; - warn!(?seq, "Still waiting for re-execution to complete"); - } - }); - - cache - .notify_read_executed_effects_digests(&tx_digests) - .await - .expect("notify_read_executed_effects_digests should not fail"); - - waiting_logger.abort(); - waiting_logger.await.ok(); - info!(?seq, "Re-execution completed for locally built checkpoint"); - } - - info!("Re-execution of locally built checkpoints completed"); - } } #[derive(Copy, Clone, Debug, Serialize, Deserialize)] @@ -922,7 +819,12 @@ impl CheckpointBuilder { } } - async fn run(mut self) { + async fn run( + mut self, + startup_wait: tokio::sync::oneshot::Receiver>, + ) { + info!("CheckpointBuilder waiting for startup signal"); + let mut notify = Some(startup_wait.await.unwrap()); info!("Starting CheckpointBuilder"); loop { // Check whether an exit signal has been received, if so we break the loop. @@ -936,6 +838,10 @@ impl CheckpointBuilder { self.maybe_build_checkpoints().await; + if let Some(notify) = notify.take() { + notify.send(()).unwrap(); + } + match select(self.exit.changed().boxed(), self.notify.notified().boxed()).await { Either::Left(_) => { // break loop on exit signal @@ -1097,7 +1003,7 @@ impl CheckpointBuilder { // No other dependencies of this consensus commit prologue that haven't been included // in any previous checkpoint. - assert_eq!(unsorted_ccp.len(), 1); + assert_eq!(unsorted_ccp.len(), 1, "unsorted_ccp: {:?}", unsorted_ccp); assert_eq!(unsorted_ccp[0].transaction_digest(), ccp_digest); } consensus_commit_prologue @@ -1184,11 +1090,24 @@ impl CheckpointBuilder { contents_digest = ?contents.digest(), "writing checkpoint", ); - all_tx_digests.extend(contents.iter().map(|digests| digests.transaction)); - self.output - .checkpoint_created(summary, contents, &self.epoch_store, &self.tables) - .await?; + if let Some(previously_computed_summary) = self + .tables + .locally_computed_checkpoints + .get(&summary.sequence_number)? + { + if previously_computed_summary != *summary { + // Panic so that we don't send out an equivocating checkpoint sig. + fatal!( + "Checkpoint {} was previously built with a different result: {:?} vs {:?}", + summary.sequence_number, + previously_computed_summary, + summary + ); + } + } + + all_tx_digests.extend(contents.iter().map(|digests| digests.transaction)); self.metrics .transactions_included_in_checkpoint @@ -1209,23 +1128,15 @@ impl CheckpointBuilder { )?; } - // Durably commit transactions (but not their outputs) to the database. - // Called before writing a locally built checkpoint to the CheckpointStore, so that - // the inputs of the checkpoint cannot be lost. - // These transactions are guaranteed to be final unless this validator - // forks (i.e. constructs a checkpoint which will never be certified). In this case - // some non-final transactions could be left in the database. - // - // This is an intermediate solution until we delay commits to the epoch db. After - // we have done that, crash recovery will be done by re-processing consensus commits - // and pending_consensus_transactions, and this method can be removed. - self.state - .get_cache_commit() - .persist_transactions(&all_tx_digests) - .await?; - batch.write()?; + // Send all checkpoint sigs to consensus. + for (summary, contents) in &new_checkpoints { + self.output + .checkpoint_created(summary, contents, &self.epoch_store, &self.tables) + .await?; + } + for (local_checkpoint, _) in &new_checkpoints { if let Some(certified_checkpoint) = self .tables @@ -1239,7 +1150,7 @@ impl CheckpointBuilder { self.notify_aggregator.notify_one(); self.epoch_store - .process_pending_checkpoint(height, new_checkpoints)?; + .process_pending_checkpoint(height, new_checkpoints); Ok(()) } @@ -1377,7 +1288,7 @@ impl CheckpointBuilder { } self.epoch_store - .consensus_messages_processed_notify(transaction_keys) + .consensus_messages_processed_notify_for_checkpoint(transaction_keys) .await?; } @@ -2240,7 +2151,11 @@ impl CheckpointService { metrics: Arc, max_transactions_per_checkpoint: usize, max_checkpoint_size_bytes: usize, - ) -> (Arc, watch::Sender<()> /* The exit sender */) { + ) -> ( + Arc, + watch::Sender<()>, /* The exit sender */ + tokio::sync::oneshot::Sender>, /* builder start-up sender */ + ) { info!( "Starting checkpoint service with {max_transactions_per_checkpoint} max_transactions_per_checkpoint and {max_checkpoint_size_bytes} max_checkpoint_size_bytes" ); @@ -2265,7 +2180,9 @@ impl CheckpointService { ); let epoch_store_clone = epoch_store.clone(); - spawn_monitored_task!(epoch_store_clone.within_alive_epoch(builder.run())); + let (tx, rx) = tokio::sync::oneshot::channel(); + + spawn_monitored_task!(epoch_store_clone.within_alive_epoch(builder.run(rx))); let aggregator = CheckpointAggregator::new( checkpoint_store.clone(), @@ -2291,7 +2208,7 @@ impl CheckpointService { last_signature_index, metrics, }); - (service, exit_snd) + (service, exit_snd, tx) } #[cfg(test)] @@ -2304,9 +2221,8 @@ impl CheckpointService { let mut output = ConsensusCommitOutput::new(); epoch_store.write_pending_checkpoint(&mut output, &checkpoint)?; - let mut batch = epoch_store.db_batch_for_test(); - output.write_to_batch(epoch_store, &mut batch)?; - batch.write()?; + output.set_default_commit_stats_for_testing(); + epoch_store.push_consensus_output_for_tests(output); self.notify_checkpoint()?; Ok(()) } @@ -2535,7 +2451,7 @@ mod tests { &epoch_store, )); - let (checkpoint_service, _exit) = CheckpointService::spawn( + let (checkpoint_service, _exit, startup) = CheckpointService::spawn( state.clone(), checkpoint_store, epoch_store.clone(), @@ -2547,6 +2463,9 @@ mod tests { 3, 100_000, ); + let (tx, rx) = tokio::sync::oneshot::channel(); + startup.send(tx).unwrap(); + rx.await.unwrap(); checkpoint_service .write_and_notify_checkpoint_for_testing(&epoch_store, p(0, vec![4], 0)) diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index d00b8273f1d02..615fd36683538 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -48,7 +48,7 @@ use crate::{ consensus_types::consensus_output_api::{ parse_block_transactions, ConsensusCommitAPI, ParsedTransaction, }, - execution_cache::ObjectCacheRead, + execution_cache::{ObjectCacheRead, TransactionCacheRead}, scoring_decision::update_low_scoring_authorities, transaction_manager::TransactionManager, }; @@ -104,6 +104,7 @@ impl ConsensusHandlerInitializer { self.checkpoint_service.clone(), self.state.transaction_manager().clone(), self.state.get_object_cache_reader().clone(), + self.state.get_transaction_cache_reader().clone(), self.low_scoring_authorities.clone(), consensus_committee, self.state.metrics.clone(), @@ -127,6 +128,8 @@ pub struct ConsensusHandler { checkpoint_service: Arc, /// cache reader is needed when determining the next version to assign for shared objects. cache_reader: Arc, + /// used to read randomness transactions during crash recovery + tx_reader: Arc, /// Reputation scores used by consensus adapter that we update, forwarded from consensus low_scoring_authorities: Arc>>, /// The consensus committee used to do stake computations for deciding set of low scoring authorities @@ -150,6 +153,7 @@ impl ConsensusHandler { checkpoint_service: Arc, transaction_manager: Arc, cache_reader: Arc, + tx_reader: Arc, low_scoring_authorities: Arc>>, committee: ConsensusCommittee, metrics: Arc, @@ -170,6 +174,7 @@ impl ConsensusHandler { last_consensus_stats, checkpoint_service, cache_reader, + tx_reader, low_scoring_authorities, committee, metrics, @@ -411,6 +416,7 @@ impl ConsensusHandler { &self.last_consensus_stats, &self.checkpoint_service, self.cache_reader.as_ref(), + self.tx_reader.as_ref(), &ConsensusCommitInfo::new(self.epoch_store.protocol_config(), &consensus_commit), &self.metrics, ) @@ -1019,6 +1025,7 @@ mod tests { Arc::new(CheckpointServiceNoop {}), state.transaction_manager().clone(), state.get_object_cache_reader().clone(), + state.get_transaction_cache_reader().clone(), Arc::new(ArcSwap::default()), consensus_committee.clone(), metrics, diff --git a/crates/sui-core/src/epoch/randomness.rs b/crates/sui-core/src/epoch/randomness.rs index 952ea5952d8f1..6bf4d06d01305 100644 --- a/crates/sui-core/src/epoch/randomness.rs +++ b/crates/sui-core/src/epoch/randomness.rs @@ -667,12 +667,11 @@ impl RandomnessManager { output: &mut ConsensusCommitOutput, ) -> SuiResult> { let epoch_store = self.epoch_store()?; - let tables = epoch_store.tables()?; - let last_round_timestamp = tables - .randomness_last_round_timestamp - .get(&SINGLETON_KEY) - .expect("typed_store should not fail"); + let last_round_timestamp = epoch_store + .get_randomness_last_round_timestamp() + .expect("read should not fail"); + if let Some(last_round_timestamp) = last_round_timestamp { if commit_timestamp - last_round_timestamp < epoch_store diff --git a/crates/sui-core/src/execution_cache.rs b/crates/sui-core/src/execution_cache.rs index 83cca46df4ccb..940e7d92b5b13 100644 --- a/crates/sui-core/src/execution_cache.rs +++ b/crates/sui-core/src/execution_cache.rs @@ -21,6 +21,7 @@ use sui_types::base_types::VerifiedExecutionData; use sui_types::digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest}; use sui_types::effects::{TransactionEffects, TransactionEvents}; use sui_types::error::{SuiError, SuiResult, UserInputError}; +use sui_types::executable_transaction::VerifiedExecutableTransaction; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use sui_types::object::Object; use sui_types::storage::{ @@ -179,20 +180,10 @@ pub trait ExecutionCacheCommit: Send + Sync { digests: &'a [TransactionDigest], ) -> BoxFuture<'a, SuiResult>; - /// Durably commit transactions (but not their outputs) to the database. - /// Called before writing a locally built checkpoint to the CheckpointStore, so that - /// the inputs of the checkpoint cannot be lost. - /// These transactions are guaranteed to be final unless this validator - /// forks (i.e. constructs a checkpoint which will never be certified). In this case - /// some non-final transactions could be left in the database. - /// - /// This is an intermediate solution until we delay commits to the epoch db. After - /// we have done that, crash recovery will be done by re-processing consensus commits - /// and pending_consensus_transactions, and this method can be removed. - fn persist_transactions<'a>( - &'a self, - digests: &'a [TransactionDigest], - ) -> BoxFuture<'a, SuiResult>; + /// Durably commit a transaction to the database. Used to store any transactions + /// that cannot be reconstructed at start-up by consensus replay. Currently the only + /// case of this is RandomnessTransaction. + fn persist_transaction(&self, transaction: &VerifiedExecutableTransaction) -> SuiResult; } pub trait ObjectCacheRead: Send + Sync { diff --git a/crates/sui-core/src/execution_cache/passthrough_cache.rs b/crates/sui-core/src/execution_cache/passthrough_cache.rs index 1518bda18a0d6..395e8d7f939ad 100644 --- a/crates/sui-core/src/execution_cache/passthrough_cache.rs +++ b/crates/sui-core/src/execution_cache/passthrough_cache.rs @@ -22,6 +22,7 @@ use sui_types::bridge::{get_bridge, Bridge}; use sui_types::digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest}; use sui_types::effects::{TransactionEffects, TransactionEvents}; use sui_types::error::{SuiError, SuiResult}; +use sui_types::executable_transaction::VerifiedExecutableTransaction; use sui_types::message_envelope::Message; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use sui_types::object::Object; @@ -340,9 +341,8 @@ impl ExecutionCacheCommit for PassthroughCache { async { Ok(()) }.boxed() } - fn persist_transactions(&self, _digests: &[TransactionDigest]) -> BoxFuture<'_, SuiResult> { - // Nothing needs to be done since they were already committed in write_transaction_outputs - async { Ok(()) }.boxed() + fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult { + self.store.persist_transaction(tx) } } diff --git a/crates/sui-core/src/execution_cache/proxy_cache.rs b/crates/sui-core/src/execution_cache/proxy_cache.rs index def5bf824190f..022a0331f9349 100644 --- a/crates/sui-core/src/execution_cache/proxy_cache.rs +++ b/crates/sui-core/src/execution_cache/proxy_cache.rs @@ -22,6 +22,7 @@ use sui_types::bridge::Bridge; use sui_types::digests::{TransactionDigest, TransactionEffectsDigest, TransactionEventsDigest}; use sui_types::effects::{TransactionEffects, TransactionEvents}; use sui_types::error::{SuiError, SuiResult}; +use sui_types::executable_transaction::VerifiedExecutableTransaction; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use sui_types::object::Object; use sui_types::storage::{MarkerValue, ObjectKey, ObjectOrTombstone, PackageObject}; @@ -314,11 +315,8 @@ impl ExecutionCacheCommit for ProxyCache { delegate_method!(self.commit_transaction_outputs(epoch, digests)) } - fn persist_transactions<'a>( - &'a self, - digests: &'a [TransactionDigest], - ) -> BoxFuture<'a, SuiResult> { - delegate_method!(self.persist_transactions(digests)) + fn persist_transaction(&self, transaction: &VerifiedExecutableTransaction) -> SuiResult { + delegate_method!(self.persist_transaction(transaction)) } } diff --git a/crates/sui-core/src/execution_cache/writeback_cache.rs b/crates/sui-core/src/execution_cache/writeback_cache.rs index 8431429b683db..b16b6e3bee190 100644 --- a/crates/sui-core/src/execution_cache/writeback_cache.rs +++ b/crates/sui-core/src/execution_cache/writeback_cache.rs @@ -67,6 +67,7 @@ use sui_types::digests::{ }; use sui_types::effects::{TransactionEffects, TransactionEvents}; use sui_types::error::{SuiError, SuiResult, UserInputError}; +use sui_types::executable_transaction::VerifiedExecutableTransaction; use sui_types::message_envelope::Message; use sui_types::messages_checkpoint::CheckpointSequenceNumber; use sui_types::object::Object; @@ -998,33 +999,6 @@ impl WritebackCache { } } - async fn persist_transactions(&self, digests: &[TransactionDigest]) -> SuiResult { - let mut txns = Vec::with_capacity(digests.len()); - for tx_digest in digests { - let Some(tx) = self - .dirty - .pending_transaction_writes - .get(tx_digest) - .map(|o| o.transaction.clone()) - else { - // tx should exist in the db if it is not in dirty set. - debug_assert!(self - .store - .get_transaction_block(tx_digest) - .unwrap() - .is_some()); - // If the transaction is not in dirty, it does not need to be committed. - // This situation can happen if we build a checkpoint locally which was just executed - // via state sync. - continue; - }; - - txns.push((*tx_digest, (*tx).clone())); - } - - self.store.commit_transactions(&txns) - } - // Move the oldest/least entry from the dirty queue to the cache queue. // This is called after the entry is committed to the db. fn move_version_from_dirty_to_cache( @@ -1192,11 +1166,8 @@ impl ExecutionCacheCommit for WritebackCache { WritebackCache::commit_transaction_outputs(self, epoch, digests).boxed() } - fn persist_transactions<'a>( - &'a self, - digests: &'a [TransactionDigest], - ) -> BoxFuture<'a, SuiResult> { - WritebackCache::persist_transactions(self, digests).boxed() + fn persist_transaction(&self, tx: &VerifiedExecutableTransaction) -> SuiResult { + self.store.persist_transaction(tx) } } diff --git a/crates/sui-core/src/execution_driver.rs b/crates/sui-core/src/execution_driver.rs index ca433a8a4d383..9154c32ea9db1 100644 --- a/crates/sui-core/src/execution_driver.rs +++ b/crates/sui-core/src/execution_driver.rs @@ -86,6 +86,16 @@ pub async fn execution_process( let digest = *certificate.digest(); trace!(?digest, "Pending certificate execution activated."); + if epoch_store.epoch() != certificate.epoch() { + info!( + ?digest, + cur_epoch = epoch_store.epoch(), + cert_epoch = certificate.epoch(), + "Ignoring certificate from previous epoch." + ); + continue; + } + let limit = limit.clone(); // hold semaphore permit until task completes. unwrap ok because we never close // the semaphore in this context. diff --git a/crates/sui-core/src/mock_consensus.rs b/crates/sui-core/src/mock_consensus.rs index 6d784c22b406a..e638ef2d7acec 100644 --- a/crates/sui-core/src/mock_consensus.rs +++ b/crates/sui-core/src/mock_consensus.rs @@ -66,6 +66,7 @@ impl MockConsensusClient { vec![SequencedConsensusTransaction::new_test(tx.clone())], &checkpoint_service, validator.get_object_cache_reader().as_ref(), + validator.get_transaction_cache_reader().as_ref(), &authority_metrics, true, ) diff --git a/crates/sui-core/src/transaction_input_loader.rs b/crates/sui-core/src/transaction_input_loader.rs index b9f1028598c88..5e5c0f9fd37da 100644 --- a/crates/sui-core/src/transaction_input_loader.rs +++ b/crates/sui-core/src/transaction_input_loader.rs @@ -7,7 +7,7 @@ use once_cell::unsync::OnceCell; use std::collections::HashMap; use std::sync::Arc; use sui_types::{ - base_types::{EpochId, ObjectID, ObjectRef, SequenceNumber, TransactionDigest}, + base_types::{EpochId, ObjectRef, TransactionDigest}, error::{SuiError, SuiResult, UserInputError}, storage::{GetSharedLocks, ObjectKey}, transaction::{ @@ -15,7 +15,7 @@ use sui_types::{ ReceivingObjectReadResult, ReceivingObjectReadResultKind, ReceivingObjects, TransactionKey, }, }; -use tracing::instrument; +use tracing::{error, instrument}; pub(crate) struct TransactionInputLoader { cache: Arc, @@ -129,7 +129,7 @@ impl TransactionInputLoader { input_object_kinds: &[InputObjectKind], epoch_id: EpochId, ) -> SuiResult { - let shared_locks_cell: OnceCell> = OnceCell::new(); + let shared_locks_cell: OnceCell>> = OnceCell::new(); let mut results = vec![None; input_object_kinds.len()]; let mut object_keys = Vec::with_capacity(input_object_kinds.len()); @@ -153,17 +153,26 @@ impl TransactionInputLoader { fetches.push((i, input)); } InputObjectKind::SharedMoveObject { id, .. } => { - let shared_locks = shared_locks_cell.get_or_try_init(|| { - Ok::, SuiError>( + let shared_locks = shared_locks_cell + .get_or_init(|| { shared_lock_store - .get_shared_locks(tx_key)? - .into_iter() - .collect(), - ) - })?; - // If we can't find the locked version, it means - // 1. either we have a bug that skips shared object version assignment - // 2. or we have some DB corruption + .get_shared_locks(tx_key) + .map(|locks| locks.into_iter().collect()) + }) + .as_ref() + // TODO: because we are holding the tx lock here, we can probably make this a panic, + // because that should make it impossible for a concurent execution attempt to delete + // the shared version assignments before we have a chance to read them. + .ok_or_else(|| { + error!("Failed to get shared locks for transaction {tx_key:?}"); + SuiError::GenericAuthorityError { + error: format!( + "Failed to get shared locks for transaction {tx_key:?}" + ), + } + })?; + + // If we find a set of locks but an object is missing, it indicates a serious inconsistency: let version = shared_locks.get(id).unwrap_or_else(|| { panic!("Shared object locks should have been set. key: {tx_key:?}, obj id: {id:?}") }); diff --git a/crates/sui-core/src/transaction_manager.rs b/crates/sui-core/src/transaction_manager.rs index 0ffd0c7a92b51..ece5c8a7c5b08 100644 --- a/crates/sui-core/src/transaction_manager.rs +++ b/crates/sui-core/src/transaction_manager.rs @@ -9,6 +9,7 @@ use std::{ }; use lru::LruCache; +use mysten_common::fatal; use mysten_metrics::monitored_scope; use parking_lot::RwLock; use sui_types::{ @@ -342,15 +343,13 @@ impl TransactionManager { tx_ready_certificates: UnboundedSender, metrics: Arc, ) -> TransactionManager { - let transaction_manager = TransactionManager { + TransactionManager { object_cache_read, transaction_cache_read, metrics: metrics.clone(), inner: RwLock::new(RwLock::new(Inner::new(epoch_store.epoch(), metrics))), tx_ready_certificates, - }; - transaction_manager.enqueue(epoch_store.all_pending_execution().unwrap(), epoch_store); - transaction_manager + } } /// Enqueues certificates / verified transactions into TransactionManager. Once all of the input objects are available @@ -413,9 +412,7 @@ impl TransactionManager { if self .transaction_cache_read .is_tx_already_executed(&digest) - .unwrap_or_else(|err| { - panic!("Failed to check if tx is already executed: {:?}", err) - }) + .expect("is_tx_already_executed cannot fail") { self.metrics .transaction_manager_num_enqueued_certificates @@ -432,7 +429,7 @@ impl TransactionManager { let mut receiving_objects: HashSet = HashSet::new(); let certs: Vec<_> = certs .into_iter() - .map(|(cert, fx_digest)| { + .filter_map(|(cert, fx_digest)| { let input_object_kinds = cert .data() .intent_message() @@ -440,7 +437,24 @@ impl TransactionManager { .input_objects() .expect("input_objects() cannot fail"); let mut input_object_keys = - epoch_store.get_input_object_keys(&cert.key(), &input_object_kinds); + match epoch_store.get_input_object_keys(&cert.key(), &input_object_kinds) { + Ok(keys) => keys, + Err(e) => { + // Because we do not hold the transaction lock during enqueue, it is possible + // that the transaction was executed and the shared version assignments deleted + // since the earlier check. This is a rare race condition, and it is better to + // handle it ad-hoc here than to hold tx locks for every cert for the duration + // of this function in order to remove the race. + if self + .transaction_cache_read + .is_tx_already_executed(cert.digest()) + .expect("is_tx_already_executed cannot fail") + { + return None; + } + fatal!("Failed to get input object keys: {:?}", e); + } + }; if input_object_kinds.len() != input_object_keys.len() { error!("Duplicated input objects: {:?}", input_object_kinds); @@ -467,7 +481,7 @@ impl TransactionManager { } } - (cert, fx_digest, input_object_keys) + Some((cert, fx_digest, input_object_keys)) }) .collect(); diff --git a/crates/sui-core/src/unit_tests/authority_tests.rs b/crates/sui-core/src/unit_tests/authority_tests.rs index 70ec3e454c5de..feb0cc67670d8 100644 --- a/crates/sui-core/src/unit_tests/authority_tests.rs +++ b/crates/sui-core/src/unit_tests/authority_tests.rs @@ -5980,9 +5980,7 @@ async fn test_consensus_handler_per_object_congestion_control( } else { epoch_store.get_highest_pending_checkpoint_height() }; - let deferred_txns = epoch_store - .get_all_deferred_transactions_for_test() - .unwrap(); + let deferred_txns = epoch_store.get_all_deferred_transactions_for_test(); assert_eq!(deferred_txns.len(), 1); assert_eq!(deferred_txns[0].1.len(), 3); let deferral_key = deferred_txns[0].0; @@ -6043,8 +6041,7 @@ async fn test_consensus_handler_per_object_congestion_control( let deferred_txns = authority .epoch_store_for_testing() - .get_all_deferred_transactions_for_test() - .unwrap(); + .get_all_deferred_transactions_for_test(); assert_eq!(deferred_txns.len(), 1); assert_eq!(deferred_txns[0].1.len(), 1); let deferral_key = deferred_txns[0].0; @@ -6072,7 +6069,6 @@ async fn test_consensus_handler_per_object_congestion_control( assert!(authority .epoch_store_for_testing() .get_all_deferred_transactions_for_test() - .unwrap() .is_empty()); } @@ -6208,7 +6204,6 @@ async fn test_consensus_handler_congestion_control_transaction_cancellation() { assert!(authority .epoch_store_for_testing() .get_all_deferred_transactions_for_test() - .unwrap() .is_empty()); // Check cancelled transaction shared locks. diff --git a/crates/sui-core/src/unit_tests/consensus_tests.rs b/crates/sui-core/src/unit_tests/consensus_tests.rs index 46247ae04edf1..a7152f1f65133 100644 --- a/crates/sui-core/src/unit_tests/consensus_tests.rs +++ b/crates/sui-core/src/unit_tests/consensus_tests.rs @@ -220,6 +220,7 @@ pub fn make_consensus_adapter_for_test( vec![tx], &checkpoint_service, self.state.get_object_cache_reader().as_ref(), + self.state.get_transaction_cache_reader().as_ref(), &self.state.metrics, true, ) @@ -238,6 +239,7 @@ pub fn make_consensus_adapter_for_test( vec![tx], &checkpoint_service, self.state.get_object_cache_reader().as_ref(), + self.state.get_transaction_cache_reader().as_ref(), &self.state.metrics, true, ) diff --git a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs index 00e8a77436624..bea7bce3c9bec 100644 --- a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs @@ -33,7 +33,7 @@ pub fn checkpoint_service_for_testing(state: Arc) -> Arc(10); - let (checkpoint_service, _) = CheckpointService::spawn( + let (checkpoint_service, _, startup) = CheckpointService::spawn( state.clone(), state.get_checkpoint_store().clone(), epoch_store.clone(), @@ -45,6 +45,11 @@ pub fn checkpoint_service_for_testing(state: Arc) -> Arc, sui_tx_validator_metrics: Arc, ) -> Result { - let (checkpoint_service, checkpoint_service_exit) = Self::start_checkpoint_service( - config, - consensus_adapter.clone(), - checkpoint_store, - epoch_store.clone(), - state.clone(), - state_sync_handle, - accumulator, - checkpoint_metrics.clone(), - ); + let (checkpoint_service, checkpoint_service_exit, startup_sender) = + Self::start_checkpoint_service( + config, + consensus_adapter.clone(), + checkpoint_store, + epoch_store.clone(), + state.clone(), + state_sync_handle, + accumulator, + checkpoint_metrics.clone(), + ); // create a new map that gets injected into both the consensus handler and the consensus adapter // the consensus handler will write values forwarded from consensus, and the consensus adapter @@ -1331,6 +1332,14 @@ impl SuiNode { ) .await; + info!("consensus manager started"); + let (tx, rx) = tokio::sync::oneshot::channel(); + startup_sender + .send(tx) + .expect("Failed to send startup signal"); + + rx.await.expect("Failed to receive startup signal"); + if epoch_store.authenticator_state_enabled() { Self::start_jwk_updater( config, @@ -1362,7 +1371,11 @@ impl SuiNode { state_sync_handle: state_sync::Handle, accumulator: Weak, checkpoint_metrics: Arc, - ) -> (Arc, watch::Sender<()>) { + ) -> ( + Arc, + watch::Sender<()>, + tokio::sync::oneshot::Sender>, + ) { let epoch_start_timestamp_ms = epoch_store.epoch_start_state().epoch_start_timestamp_ms(); let epoch_duration_ms = epoch_store.epoch_start_state().epoch_duration_ms(); @@ -1813,6 +1826,15 @@ impl SuiNode { .expect("Error loading last checkpoint for current epoch") .expect("Could not load last checkpoint for current epoch"); + let last_checkpoint_seq = *last_checkpoint.sequence_number(); + + assert_eq!( + Some(last_checkpoint_seq), + self.checkpoint_store + .get_highest_executed_checkpoint_seq_number() + .expect("Error loading highest executed checkpoint sequence number") + ); + let epoch_start_configuration = EpochStartConfiguration::new( next_epoch_start_system_state, *last_checkpoint.digest(), @@ -1830,6 +1852,7 @@ impl SuiNode { epoch_start_configuration, accumulator, &self.config.expensive_safety_check_config, + last_checkpoint_seq, ) .await .expect("Reconfigure authority state cannot fail"); diff --git a/crates/sui-single-node-benchmark/src/mock_storage.rs b/crates/sui-single-node-benchmark/src/mock_storage.rs index c6f03fe1bb8bd..954f60d61617c 100644 --- a/crates/sui-single-node-benchmark/src/mock_storage.rs +++ b/crates/sui-single-node-benchmark/src/mock_storage.rs @@ -48,7 +48,7 @@ impl InMemoryObjectStore { tx_key: &TransactionKey, input_object_kinds: &[InputObjectKind], ) -> SuiResult { - let shared_locks_cell: OnceCell> = OnceCell::new(); + let shared_locks_cell: OnceCell>> = OnceCell::new(); let mut input_objects = Vec::new(); for kind in input_object_kinds { let obj: Option = match kind { @@ -58,11 +58,16 @@ impl InMemoryObjectStore { } InputObjectKind::SharedMoveObject { id, .. } => { - let shared_locks = shared_locks_cell.get_or_try_init(|| { - Ok::, SuiError>( - shared_locks.get_shared_locks(tx_key)?.into_iter().collect(), - ) - })?; + let shared_locks = shared_locks_cell + .get_or_init(|| { + shared_locks + .get_shared_locks(tx_key) + .map(|l| l.into_iter().collect()) + }) + .as_ref() + .ok_or_else(|| SuiError::GenericAuthorityError { + error: "Shared object locks should have been set.".to_string(), + })?; let version = shared_locks.get(id).unwrap_or_else(|| { panic!("Shared object locks should have been set. key: {tx_key:?}, obj id: {id:?}") }); @@ -171,10 +176,7 @@ impl ParentSync for InMemoryObjectStore { } impl GetSharedLocks for InMemoryObjectStore { - fn get_shared_locks( - &self, - _key: &TransactionKey, - ) -> Result, SuiError> { + fn get_shared_locks(&self, _key: &TransactionKey) -> Option> { unreachable!() } } diff --git a/crates/sui-types/src/storage/mod.rs b/crates/sui-types/src/storage/mod.rs index 93cb31330eb24..e7d7505591e18 100644 --- a/crates/sui-types/src/storage/mod.rs +++ b/crates/sui-types/src/storage/mod.rs @@ -606,8 +606,5 @@ where } pub trait GetSharedLocks: Send + Sync { - fn get_shared_locks( - &self, - key: &TransactionKey, - ) -> Result, SuiError>; + fn get_shared_locks(&self, key: &TransactionKey) -> Option>; }