diff --git a/crates/sui-graphql-rpc/src/types/transaction_block.rs b/crates/sui-graphql-rpc/src/types/transaction_block.rs index 5843d88460cb5..979ea62f36b0c 100644 --- a/crates/sui-graphql-rpc/src/types/transaction_block.rs +++ b/crates/sui-graphql-rpc/src/types/transaction_block.rs @@ -8,14 +8,14 @@ use async_graphql::{ dataloader::Loader, *, }; -use diesel::{BoolExpressionMethods, ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper}; +use diesel::{ExpressionMethods, JoinOnDsl, QueryDsl, SelectableHelper}; use fastcrypto::encoding::{Base58, Encoding}; use serde::{Deserialize, Serialize}; use sui_indexer::{ models::transactions::StoredTransaction, schema::{ - transactions, tx_calls, tx_changed_objects, tx_digests, tx_input_objects, tx_recipients, - tx_senders, + transactions, tx_calls_fun, tx_changed_objects, tx_digests, tx_input_objects, + tx_recipients, tx_senders, }, }; use sui_types::{ @@ -317,15 +317,15 @@ impl TransactionBlock { let mut query = tx::dsl::transactions.into_boxed(); if let Some(f) = &filter.function { - let sub_query = tx_calls::dsl::tx_calls - .select(tx_calls::dsl::tx_sequence_number) + let sub_query = tx_calls_fun::dsl::tx_calls_fun + .select(tx_calls_fun::dsl::tx_sequence_number) .into_boxed(); query = query.filter(tx::dsl::tx_sequence_number.eq_any(f.apply( sub_query, - tx_calls::dsl::package, - tx_calls::dsl::module, - tx_calls::dsl::func, + tx_calls_fun::dsl::package, + tx_calls_fun::dsl::module, + tx_calls_fun::dsl::func, ))); } @@ -505,9 +505,7 @@ impl Loader for Db { let transactions: Vec = self .execute(move |conn| { conn.results(move || { - let join = ds::cp_sequence_number - .eq(tx::checkpoint_sequence_number) - .and(ds::tx_sequence_number.eq(tx::tx_sequence_number)); + let join = ds::tx_sequence_number.eq(tx::tx_sequence_number); tx::transactions .inner_join(ds::tx_digests.on(join)) diff --git a/crates/sui-indexer/migrations/2023-08-19-044020_events/up.sql b/crates/sui-indexer/migrations/2023-08-19-044020_events/up.sql index a6c0d70566e7b..dfbfa3ea14495 100644 --- a/crates/sui-indexer/migrations/2023-08-19-044020_events/up.sql +++ b/crates/sui-indexer/migrations/2023-08-19-044020_events/up.sql @@ -1,3 +1,4 @@ +-- TODO: modify queries in indexer reader to take advantage of the new indices CREATE TABLE events ( tx_sequence_number BIGINT NOT NULL, @@ -23,8 +24,8 @@ CREATE TABLE events timestamp_ms BIGINT NOT NULL, -- bcs of the Event contents (Event.contents) bcs BYTEA NOT NULL, - PRIMARY KEY(tx_sequence_number, event_sequence_number, checkpoint_sequence_number) -) PARTITION BY RANGE (checkpoint_sequence_number); + PRIMARY KEY(tx_sequence_number, event_sequence_number) +) PARTITION BY RANGE (tx_sequence_number); CREATE TABLE events_partition_0 PARTITION OF events FOR VALUES FROM (0) TO (MAXVALUE); CREATE INDEX events_package ON events (package, tx_sequence_number, event_sequence_number); CREATE INDEX events_package_module ON events (package, module, tx_sequence_number, event_sequence_number); diff --git a/crates/sui-indexer/migrations/2023-08-19-044026_transactions/down.sql b/crates/sui-indexer/migrations/2023-08-19-044026_transactions/down.sql index e5850457f922e..15e9dc9f1cb82 100644 --- a/crates/sui-indexer/migrations/2023-08-19-044026_transactions/down.sql +++ b/crates/sui-indexer/migrations/2023-08-19-044026_transactions/down.sql @@ -1,2 +1,3 @@ -- This file should undo anything in `up.sql` DROP TABLE IF EXISTS transactions; +DROP TABLE IF EXISTS transactions_partition_0; diff --git a/crates/sui-indexer/migrations/2023-08-19-044026_transactions/up.sql b/crates/sui-indexer/migrations/2023-08-19-044026_transactions/up.sql index ede66ad44798c..f5404e3610751 100644 --- a/crates/sui-indexer/migrations/2023-08-19-044026_transactions/up.sql +++ b/crates/sui-indexer/migrations/2023-08-19-044026_transactions/up.sql @@ -18,10 +18,6 @@ CREATE TABLE transactions ( -- number of successful commands in this transaction, bound by number of command -- in a programmaable transaction. success_command_count smallint NOT NULL, - PRIMARY KEY (tx_sequence_number, checkpoint_sequence_number) -) PARTITION BY RANGE (checkpoint_sequence_number); + PRIMARY KEY (tx_sequence_number) +) PARTITION BY RANGE (tx_sequence_number); CREATE TABLE transactions_partition_0 PARTITION OF transactions FOR VALUES FROM (0) TO (MAXVALUE); -CREATE INDEX transactions_transaction_digest ON transactions (transaction_digest); -CREATE INDEX transactions_checkpoint_sequence_number ON transactions (checkpoint_sequence_number); --- only create index for system transactions (0). See types.rs -CREATE INDEX transactions_transaction_kind ON transactions (transaction_kind) WHERE transaction_kind = 0; diff --git a/crates/sui-indexer/migrations/2023-08-19-044044_checkpoints/up.sql b/crates/sui-indexer/migrations/2023-08-19-044044_checkpoints/up.sql index 5f7281a2e1a1d..ddb63b020de70 100644 --- a/crates/sui-indexer/migrations/2023-08-19-044044_checkpoints/up.sql +++ b/crates/sui-indexer/migrations/2023-08-19-044044_checkpoints/up.sql @@ -1,15 +1,15 @@ CREATE TABLE checkpoints ( - sequence_number bigint PRIMARY KEY, - checkpoint_digest bytea NOT NULL, - epoch bigint NOT NULL, + sequence_number BIGINT PRIMARY KEY, + checkpoint_digest BYTEA NOT NULL, + epoch BIGINT NOT NULL, -- total transactions in the network at the end of this checkpoint (including itself) - network_total_transactions bigint NOT NULL, - previous_checkpoint_digest bytea, + network_total_transactions BIGINT NOT NULL, + previous_checkpoint_digest BYTEA, -- if this checkpoitn is the last checkpoint of an epoch end_of_epoch boolean NOT NULL, -- array of TranscationDigest in bytes included in this checkpoint - tx_digests bytea[] NOT NULL, + tx_digests BYTEA[] NOT NULL, timestamp_ms BIGINT NOT NULL, total_gas_cost BIGINT NOT NULL, computation_cost BIGINT NOT NULL, @@ -17,11 +17,13 @@ CREATE TABLE checkpoints storage_rebate BIGINT NOT NULL, non_refundable_storage_fee BIGINT NOT NULL, -- bcs serialized Vec bytes - checkpoint_commitments bytea NOT NULL, + checkpoint_commitments BYTEA NOT NULL, -- bcs serialized AggregateAuthoritySignature bytes - validator_signature bytea NOT NULL, + validator_signature BYTEA NOT NULL, -- bcs serialzied EndOfEpochData bytes, if the checkpoint marks end of an epoch - end_of_epoch_data bytea + end_of_epoch_data BYTEA, + min_tx_sequence_number BIGINT, + max_tx_sequence_number BIGINT ); CREATE INDEX checkpoints_epoch ON checkpoints (epoch, sequence_number); diff --git a/crates/sui-indexer/migrations/2023-08-19-060729_packages/up.sql b/crates/sui-indexer/migrations/2023-08-19-060729_packages/up.sql index a95489af4dc41..f08a5549608eb 100644 --- a/crates/sui-indexer/migrations/2023-08-19-060729_packages/up.sql +++ b/crates/sui-indexer/migrations/2023-08-19-060729_packages/up.sql @@ -1,6 +1,14 @@ -CREATE TABLE packages +CREATE TABLE packages ( - package_id bytea PRIMARY KEY, + package_id bytea NOT NULL, + original_id bytea NOT NULL, + package_version bigint NOT NULL, -- bcs serialized MovePackage - move_package bytea NOT NULL + move_package bytea NOT NULL, + checkpoint_sequence_number bigint NOT NULL, + CONSTRAINT packages_pkey PRIMARY KEY (package_id, original_id, package_version), + CONSTRAINT packages_unique_package_id UNIQUE (package_id) ); + +CREATE INDEX packages_cp_id_version ON packages (checkpoint_sequence_number, original_id, package_version); +CREATE INDEX packages_id_version_cp ON packages (original_id, package_version, checkpoint_sequence_number); diff --git a/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/down.sql b/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/down.sql index 8e4f29f981c22..f5604c0db5357 100644 --- a/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/down.sql +++ b/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/down.sql @@ -2,5 +2,8 @@ DROP TABLE IF EXISTS tx_senders; DROP TABLE IF EXISTS tx_recipients; DROP TABLE IF EXISTS tx_input_objects; DROP TABLE IF EXISTS tx_changed_objects; -DROP TABLE IF EXISTS tx_calls; +DROP TABLE IF EXISTS tx_calls_pkg; +DROP TABLE IF EXISTS tx_calls_mod; +DROP TABLE IF EXISTS tx_calls_fun; DROP TABLE IF EXISTS tx_digests; +DROP TABLE IF EXISTS tx_kinds; diff --git a/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/up.sql b/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/up.sql index ed81a281f2b0a..0bcd824e31254 100644 --- a/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/up.sql +++ b/crates/sui-indexer/migrations/2023-10-06-204335_tx_indices/up.sql @@ -1,57 +1,70 @@ CREATE TABLE tx_senders ( - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL, - -- SuiAddress in bytes. sender BYTEA NOT NULL, - PRIMARY KEY(sender, tx_sequence_number, cp_sequence_number) + PRIMARY KEY(sender, tx_sequence_number) ); -CREATE INDEX tx_senders_tx_sequence_number_index ON tx_senders (tx_sequence_number, cp_sequence_number); CREATE TABLE tx_recipients ( - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL, - -- SuiAddress in bytes. recipient BYTEA NOT NULL, - PRIMARY KEY(recipient, tx_sequence_number, cp_sequence_number) + sender BYTEA NOT NULL, + PRIMARY KEY(recipient, tx_sequence_number) ); -CREATE INDEX tx_recipients_tx_sequence_number_index ON tx_recipients (tx_sequence_number, cp_sequence_number); +CREATE INDEX tx_recipients_sender ON tx_recipients (sender, recipient, tx_sequence_number); CREATE TABLE tx_input_objects ( - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL, - -- Object ID in bytes. object_id BYTEA NOT NULL, - PRIMARY KEY(object_id, tx_sequence_number, cp_sequence_number) + sender BYTEA NOT NULL, + PRIMARY KEY(object_id, tx_sequence_number) ); CREATE INDEX tx_input_objects_tx_sequence_number_index ON tx_input_objects (tx_sequence_number); +CREATE INDEX tx_input_objects_sender ON tx_input_objects (sender, object_id, tx_sequence_number); CREATE TABLE tx_changed_objects ( - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL, - -- Object Id in bytes. object_id BYTEA NOT NULL, - PRIMARY KEY(object_id, tx_sequence_number, cp_sequence_number) + sender BYTEA NOT NULL, + PRIMARY KEY(object_id, tx_sequence_number) ); CREATE INDEX tx_changed_objects_tx_sequence_number_index ON tx_changed_objects (tx_sequence_number); +CREATE INDEX tx_changed_objects_sender ON tx_changed_objects (sender, object_id, tx_sequence_number); + +CREATE TABLE tx_calls_pkg ( + tx_sequence_number BIGINT NOT NULL, + package BYTEA NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, tx_sequence_number) +); +CREATE INDEX tx_calls_pkg_sender ON tx_calls_pkg (sender, package, tx_sequence_number); + +CREATE TABLE tx_calls_mod ( + tx_sequence_number BIGINT NOT NULL, + package BYTEA NOT NULL, + module TEXT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, tx_sequence_number) +); +CREATE INDEX tx_calls_mod_sender ON tx_calls_mod (sender, package, module, tx_sequence_number); -CREATE TABLE tx_calls ( - cp_sequence_number BIGINT NOT NULL, +CREATE TABLE tx_calls_fun ( tx_sequence_number BIGINT NOT NULL, package BYTEA NOT NULL, module TEXT NOT NULL, func TEXT NOT NULL, - -- 1. Using Primary Key as a unique index. - -- 2. Diesel does not like tables with no primary key. - PRIMARY KEY(package, tx_sequence_number, cp_sequence_number) + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, func, tx_sequence_number) ); -CREATE INDEX tx_calls_module ON tx_calls (package, module, tx_sequence_number, cp_sequence_number); -CREATE INDEX tx_calls_func ON tx_calls (package, module, func, tx_sequence_number, cp_sequence_number); -CREATE INDEX tx_calls_tx_sequence_number ON tx_calls (tx_sequence_number, cp_sequence_number); +CREATE INDEX tx_calls_fun_sender ON tx_calls_fun (sender, package, module, func, tx_sequence_number); --- un-partitioned table for tx_digest -> (cp_sequence_number, tx_sequence_number) lookup. CREATE TABLE tx_digests ( tx_digest BYTEA PRIMARY KEY, - cp_sequence_number BIGINT NOT NULL, tx_sequence_number BIGINT NOT NULL ); CREATE INDEX tx_digests_tx_sequence_number ON tx_digests (tx_sequence_number); + +CREATE TABLE tx_kinds ( + tx_sequence_number BIGINT NOT NULL, + tx_kind SMALLINT NOT NULL, + PRIMARY KEY(tx_kind, tx_sequence_number) +); diff --git a/crates/sui-indexer/migrations/2023-11-29-193859_advance_partition/up.sql b/crates/sui-indexer/migrations/2023-11-29-193859_advance_partition/up.sql index cb24af8e09934..8ca64b86a7081 100644 --- a/crates/sui-indexer/migrations/2023-11-29-193859_advance_partition/up.sql +++ b/crates/sui-indexer/migrations/2023-11-29-193859_advance_partition/up.sql @@ -1,10 +1,10 @@ -CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start_cp BIGINT, new_epoch_start_cp BIGINT) +CREATE OR REPLACE PROCEDURE advance_partition(table_name TEXT, last_epoch BIGINT, new_epoch BIGINT, last_epoch_start BIGINT, new_epoch_start BIGINT) LANGUAGE plpgsql AS $$ BEGIN EXECUTE format('ALTER TABLE %I DETACH PARTITION %I_partition_%s', table_name, table_name, last_epoch); - EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start_cp, new_epoch_start_cp); - EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start_cp); + EXECUTE format('ALTER TABLE %I ATTACH PARTITION %I_partition_%s FOR VALUES FROM (%L) TO (%L)', table_name, table_name, last_epoch, last_epoch_start, new_epoch_start); + EXECUTE format('CREATE TABLE IF NOT EXISTS %I_partition_%s PARTITION OF %I FOR VALUES FROM (%L) TO (MAXVALUE)', table_name, new_epoch, table_name, new_epoch_start); END; $$; diff --git a/crates/sui-indexer/migrations/2024-05-05-155158_obj_indices/down.sql b/crates/sui-indexer/migrations/2024-05-05-155158_obj_indices/down.sql new file mode 100644 index 0000000000000..7a3a7670f24c2 --- /dev/null +++ b/crates/sui-indexer/migrations/2024-05-05-155158_obj_indices/down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS objects_version; diff --git a/crates/sui-indexer/migrations/2024-05-05-155158_obj_indices/up.sql b/crates/sui-indexer/migrations/2024-05-05-155158_obj_indices/up.sql new file mode 100644 index 0000000000000..666e5a2423319 --- /dev/null +++ b/crates/sui-indexer/migrations/2024-05-05-155158_obj_indices/up.sql @@ -0,0 +1,31 @@ +-- Indexing table mapping an object's ID and version to its checkpoint +-- sequence number, partitioned by the first byte of its Object ID. +CREATE TABLE objects_version ( + object_id bytea NOT NULL, + object_version bigint NOT NULL, + cp_sequence_number bigint NOT NULL, + PRIMARY KEY (object_id, object_version) +) PARTITION BY RANGE (object_id); + +-- Create a partition for each first byte value. +DO $$ +DECLARE + lo text; + hi text; +BEGIN + FOR i IN 0..254 LOOP + lo := LPAD(TO_HEX(i), 2, '0'); + hi := LPAD(TO_HEX(i + 1), 2, '0'); + EXECUTE FORMAT($F$ + CREATE TABLE objects_version_%1$s PARTITION OF objects_version FOR VALUES + FROM (E'\\x%1$s00000000000000000000000000000000000000000000000000000000000000') + TO (E'\\x%2$s00000000000000000000000000000000000000000000000000000000000000'); + $F$, lo, hi); + END LOOP; +END; +$$ LANGUAGE plpgsql; + +-- Special case for the last partition, because of the upper bound. +CREATE TABLE objects_version_ff PARTITION OF objects_version FOR VALUES +FROM (E'\\xff00000000000000000000000000000000000000000000000000000000000000') +TO (MAXVALUE); diff --git a/crates/sui-indexer/migrations/2024-06-14-045801_event_indices/down.sql b/crates/sui-indexer/migrations/2024-06-14-045801_event_indices/down.sql new file mode 100644 index 0000000000000..3583887435168 --- /dev/null +++ b/crates/sui-indexer/migrations/2024-06-14-045801_event_indices/down.sql @@ -0,0 +1,7 @@ +DROP TABLE IF EXISTS event_emit_package; +DROP TABLE IF EXISTS event_emit_module; +DROP TABLE IF EXISTS event_struct_package; +DROP TABLE IF EXISTS event_struct_module; +DROP TABLE IF EXISTS event_struct_name; +DROP TABLE IF EXISTS event_struct_instantiation; +DROP TABLE IF EXISTS event_senders; diff --git a/crates/sui-indexer/migrations/2024-06-14-045801_event_indices/up.sql b/crates/sui-indexer/migrations/2024-06-14-045801_event_indices/up.sql new file mode 100644 index 0000000000000..a89625146a9fd --- /dev/null +++ b/crates/sui-indexer/migrations/2024-06-14-045801_event_indices/up.sql @@ -0,0 +1,74 @@ +CREATE TABLE event_emit_package +( + package BYTEA NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_emit_package_sender ON event_emit_package (sender, package, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_emit_module +( + package BYTEA NOT NULL, + module TEXT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_emit_module_sender ON event_emit_module (sender, package, module, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_struct_package +( + package BYTEA NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_struct_package_sender ON event_struct_package (sender, package, tx_sequence_number, event_sequence_number); + + +CREATE TABLE event_struct_module +( + package BYTEA NOT NULL, + module TEXT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_struct_module_sender ON event_struct_module (sender, package, module, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_struct_name +( + package BYTEA NOT NULL, + module TEXT NOT NULL, + type_name TEXT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, type_name, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_struct_name_sender ON event_struct_name (sender, package, module, type_name, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_struct_instantiation +( + package BYTEA NOT NULL, + module TEXT NOT NULL, + type_instantiation TEXT NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + sender BYTEA NOT NULL, + PRIMARY KEY(package, module, type_instantiation, tx_sequence_number, event_sequence_number) +); +CREATE INDEX event_struct_instantiation_sender ON event_struct_instantiation (sender, package, module, type_instantiation, tx_sequence_number, event_sequence_number); + +CREATE TABLE event_senders +( + sender BYTEA NOT NULL, + tx_sequence_number BIGINT NOT NULL, + event_sequence_number BIGINT NOT NULL, + PRIMARY KEY(sender, tx_sequence_number, event_sequence_number) +); diff --git a/crates/sui-indexer/src/handlers/checkpoint_handler.rs b/crates/sui-indexer/src/handlers/checkpoint_handler.rs index fcfcbd1e8c86b..14bc720d38dee 100644 --- a/crates/sui-indexer/src/handlers/checkpoint_handler.rs +++ b/crates/sui-indexer/src/handlers/checkpoint_handler.rs @@ -49,8 +49,8 @@ use crate::db::ConnectionPool; use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver}; use crate::store::{IndexerStore, PgIndexerStore}; use crate::types::{ - IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject, - IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex, + EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, + IndexedObject, IndexedPackage, IndexedTransaction, IndexerResult, TransactionKind, TxIndex, }; use super::tx_processor::EpochEndIndexingObjectStore; @@ -199,6 +199,7 @@ where 0, //first_checkpoint_id None, ), + network_total_transactions: 0, })); } @@ -225,7 +226,12 @@ where let event = bcs::from_bytes::(&epoch_event.contents)?; // Now we just entered epoch X, we want to calculate the diff between - // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2) + // TotalTransactionsByEndOfEpoch(X-1) and TotalTransactionsByEndOfEpoch(X-2). Note that on + // the indexer's chain-reading side, this is not guaranteed to have the latest data. Rather + // than impose a wait on the reading side, however, we overwrite this on the persisting + // side, where we can guarantee that the previous epoch's checkpoints have been written to + // db. + let network_tx_count_prev_epoch = match system_state.epoch { // If first epoch change, this number is 0 1 => Ok(0), @@ -249,6 +255,7 @@ where checkpoint_summary.sequence_number + 1, // first_checkpoint_id Some(&event), ), + network_total_transactions: checkpoint_summary.network_total_transactions, })) } @@ -271,20 +278,21 @@ where let object_history_changes: TransactionObjectChangesToCommit = Self::index_objects_history(data.clone(), package_resolver.clone()).await?; - let (checkpoint, db_transactions, db_events, db_indices, db_displays) = { + let (checkpoint, db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = { let CheckpointData { transactions, checkpoint_summary, checkpoint_contents, } = data; - let (db_transactions, db_events, db_indices, db_displays) = Self::index_transactions( - transactions, - &checkpoint_summary, - &checkpoint_contents, - &metrics, - ) - .await?; + let (db_transactions, db_events, db_tx_indices, db_event_indices, db_displays) = + Self::index_transactions( + transactions, + &checkpoint_summary, + &checkpoint_contents, + &metrics, + ) + .await?; let successful_tx_num: u64 = db_transactions.iter().map(|t| t.successful_tx_num).sum(); ( @@ -295,7 +303,8 @@ where ), db_transactions, db_events, - db_indices, + db_tx_indices, + db_event_indices, db_displays, ) }; @@ -308,7 +317,8 @@ where checkpoint, transactions: db_transactions, events: db_events, - tx_indices: db_indices, + tx_indices: db_tx_indices, + event_indices: db_event_indices, display_updates: db_displays, object_changes, object_history_changes, @@ -326,6 +336,7 @@ where Vec, Vec, Vec, + Vec, BTreeMap, )> { let checkpoint_seq = checkpoint_summary.sequence_number(); @@ -346,7 +357,8 @@ where let mut db_transactions = Vec::new(); let mut db_events = Vec::new(); let mut db_displays = BTreeMap::new(); - let mut db_indices = Vec::new(); + let mut db_tx_indices = Vec::new(); + let mut db_event_indices = Vec::new(); for tx in transactions { let CheckpointTransaction { @@ -364,6 +376,7 @@ where checkpoint_seq, tx_digest, sender_signed_data.digest() ))); } + let tx = sender_signed_data.transaction_data(); let events = events .as_ref() @@ -387,6 +400,12 @@ where ) })); + db_event_indices.extend( + events.iter().enumerate().map(|(idx, event)| { + EventIndex::from_event(tx_sequence_number, idx as u64, event) + }), + ); + db_displays.extend( events .iter() @@ -414,7 +433,7 @@ where object_changes, balance_change, events, - transaction_kind, + transaction_kind: transaction_kind.clone(), successful_tx_num: if fx.status().is_ok() { tx.kind().tx_count() as u64 } else { @@ -442,8 +461,8 @@ where // Payers let payers = vec![tx.gas_owner()]; - // Senders - let senders = vec![tx.sender()]; + // Sender + let sender = tx.sender(); // Recipients let recipients = fx @@ -463,19 +482,26 @@ where .map(|(p, m, f)| (*<&ObjectID>::clone(p), m.to_string(), f.to_string())) .collect(); - db_indices.push(TxIndex { + db_tx_indices.push(TxIndex { tx_sequence_number, transaction_digest: tx_digest, checkpoint_sequence_number: *checkpoint_seq, input_objects, changed_objects, - senders, + sender, payers, recipients, move_calls, + tx_kind: transaction_kind, }); } - Ok((db_transactions, db_events, db_indices, db_displays)) + Ok(( + db_transactions, + db_events, + db_tx_indices, + db_event_indices, + db_displays, + )) } async fn index_objects( diff --git a/crates/sui-indexer/src/handlers/committer.rs b/crates/sui-indexer/src/handlers/committer.rs index f52bcc1d45d38..6f1358922c0af 100644 --- a/crates/sui-indexer/src/handlers/committer.rs +++ b/crates/sui-indexer/src/handlers/committer.rs @@ -127,6 +127,7 @@ async fn commit_checkpoints( let mut tx_batch = vec![]; let mut events_batch = vec![]; let mut tx_indices_batch = vec![]; + let mut event_indices_batch = vec![]; let mut display_updates_batch = BTreeMap::new(); let mut object_changes_batch = vec![]; let mut object_history_changes_batch = vec![]; @@ -137,6 +138,7 @@ async fn commit_checkpoints( checkpoint, transactions, events, + event_indices, tx_indices, display_updates, object_changes, @@ -148,6 +150,7 @@ async fn commit_checkpoints( tx_batch.push(transactions); events_batch.push(events); tx_indices_batch.push(tx_indices); + event_indices_batch.push(event_indices); display_updates_batch.extend(display_updates.into_iter()); object_changes_batch.push(object_changes); object_history_changes_batch.push(object_history_changes); @@ -161,6 +164,10 @@ async fn commit_checkpoints( let tx_batch = tx_batch.into_iter().flatten().collect::>(); let tx_indices_batch = tx_indices_batch.into_iter().flatten().collect::>(); let events_batch = events_batch.into_iter().flatten().collect::>(); + let event_indices_batch = event_indices_batch + .into_iter() + .flatten() + .collect::>(); let packages_batch = packages_batch.into_iter().flatten().collect::>(); let checkpoint_num = checkpoint_batch.len(); let tx_count = tx_batch.len(); @@ -171,6 +178,7 @@ async fn commit_checkpoints( state.persist_transactions(tx_batch), state.persist_tx_indices(tx_indices_batch), state.persist_events(events_batch), + state.persist_event_indices(event_indices_batch), state.persist_displays(display_updates_batch), state.persist_packages(packages_batch), state.persist_objects(object_changes_batch.clone()), diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index ca27e92a0bf41..2a6578fc18295 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -6,8 +6,8 @@ use std::collections::BTreeMap; use crate::{ models::display::StoredDisplay, types::{ - IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, IndexedObject, - IndexedPackage, IndexedTransaction, TxIndex, + EventIndex, IndexedCheckpoint, IndexedDeletedObject, IndexedEpochInfo, IndexedEvent, + IndexedObject, IndexedPackage, IndexedTransaction, TxIndex, }, }; @@ -22,6 +22,7 @@ pub struct CheckpointDataToCommit { pub checkpoint: IndexedCheckpoint, pub transactions: Vec, pub events: Vec, + pub event_indices: Vec, pub tx_indices: Vec, pub display_updates: BTreeMap, pub object_changes: TransactionObjectChangesToCommit, @@ -40,4 +41,5 @@ pub struct TransactionObjectChangesToCommit { pub struct EpochToCommit { pub last_epoch: Option, pub new_epoch: IndexedEpochInfo, + pub network_total_transactions: u64, } diff --git a/crates/sui-indexer/src/indexer_reader.rs b/crates/sui-indexer/src/indexer_reader.rs index 5254c042489eb..b50690c5d7be7 100644 --- a/crates/sui-indexer/src/indexer_reader.rs +++ b/crates/sui-indexer/src/indexer_reader.rs @@ -776,14 +776,14 @@ impl IndexerReader { let package = Hex::encode(package.to_vec()); match (module, function) { (Some(module), Some(function)) => ( - "tx_calls".into(), + "tx_calls_fun".into(), format!( "package = '\\x{}'::bytea AND module = '{}' AND func = '{}'", package, module, function ), ), (Some(module), None) => ( - "tx_calls".into(), + "tx_calls_mod".into(), format!( "package = '\\x{}'::bytea AND module = '{}'", package, module @@ -791,11 +791,11 @@ impl IndexerReader { ), (None, Some(_)) => { return Err(IndexerError::InvalidArgumentError( - "Function cannot be present wihtout Module.".into(), + "Function cannot be present without Module.".into(), )); } (None, None) => ( - "tx_calls".into(), + "tx_calls_pkg".into(), format!("package = '\\x{}'::bytea", package), ), } diff --git a/crates/sui-indexer/src/metrics.rs b/crates/sui-indexer/src/metrics.rs index 91fa07101f8b7..eb02c0c074a42 100644 --- a/crates/sui-indexer/src/metrics.rs +++ b/crates/sui-indexer/src/metrics.rs @@ -132,6 +132,8 @@ pub struct IndexerMetrics { pub checkpoint_db_commit_latency_objects_history_chunks: Histogram, pub checkpoint_db_commit_latency_events: Histogram, pub checkpoint_db_commit_latency_events_chunks: Histogram, + pub checkpoint_db_commit_latency_event_indices: Histogram, + pub checkpoint_db_commit_latency_event_indices_chunks: Histogram, pub checkpoint_db_commit_latency_packages: Histogram, pub checkpoint_db_commit_latency_tx_indices: Histogram, pub checkpoint_db_commit_latency_tx_indices_chunks: Histogram, @@ -461,7 +463,20 @@ impl IndexerMetrics { registry, ) .unwrap(), - + checkpoint_db_commit_latency_event_indices: register_histogram_with_registry!( + "checkpoint_db_commit_latency_event_indices", + "Time spent commiting event indices", + DB_COMMIT_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), + checkpoint_db_commit_latency_event_indices_chunks: register_histogram_with_registry!( + "checkpoint_db_commit_latency_event_indices_chunks", + "Time spent commiting event indices chunks", + DB_COMMIT_LATENCY_SEC_BUCKETS.to_vec(), + registry, + ) + .unwrap(), checkpoint_db_commit_latency_packages: register_histogram_with_registry!( "checkpoint_db_commit_latency_packages", "Time spent commiting packages", diff --git a/crates/sui-indexer/src/models/checkpoints.rs b/crates/sui-indexer/src/models/checkpoints.rs index 2848aad2cbe12..c2310da9f0834 100644 --- a/crates/sui-indexer/src/models/checkpoints.rs +++ b/crates/sui-indexer/src/models/checkpoints.rs @@ -37,6 +37,8 @@ pub struct StoredCheckpoint { pub checkpoint_commitments: Vec, pub validator_signature: Vec, pub end_of_epoch_data: Option>, + pub min_tx_sequence_number: Option, + pub max_tx_sequence_number: Option, } impl From<&IndexedCheckpoint> for StoredCheckpoint { @@ -68,6 +70,8 @@ impl From<&IndexedCheckpoint> for StoredCheckpoint { .as_ref() .map(|d| bcs::to_bytes(d).unwrap()), end_of_epoch: c.end_of_epoch_data.is_some(), + min_tx_sequence_number: Some(c.min_tx_sequence_number as i64), + max_tx_sequence_number: Some(c.max_tx_sequence_number as i64), } } } diff --git a/crates/sui-indexer/src/models/event_indices.rs b/crates/sui-indexer/src/models/event_indices.rs new file mode 100644 index 0000000000000..08f17cce339d5 --- /dev/null +++ b/crates/sui-indexer/src/models/event_indices.rs @@ -0,0 +1,145 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + schema::{ + event_emit_module, event_emit_package, event_senders, event_struct_instantiation, + event_struct_module, event_struct_name, event_struct_package, + }, + types::EventIndex, +}; +use diesel::prelude::*; + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_emit_package)] +pub struct StoredEventEmitPackage { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_emit_module)] +pub struct StoredEventEmitModule { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub module: String, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_senders)] +pub struct StoredEventSenders { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_struct_package)] +pub struct StoredEventStructPackage { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_struct_module)] +pub struct StoredEventStructModule { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub module: String, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_struct_name)] +pub struct StoredEventStructName { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub module: String, + pub type_name: String, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = event_struct_instantiation)] +pub struct StoredEventStructInstantiation { + pub tx_sequence_number: i64, + pub event_sequence_number: i64, + pub package: Vec, + pub module: String, + pub type_instantiation: String, + pub sender: Vec, +} + +impl EventIndex { + pub fn split( + self: EventIndex, + ) -> ( + StoredEventEmitPackage, + StoredEventEmitModule, + StoredEventSenders, + StoredEventStructPackage, + StoredEventStructModule, + StoredEventStructName, + StoredEventStructInstantiation, + ) { + let tx_sequence_number = self.tx_sequence_number as i64; + let event_sequence_number = self.event_sequence_number as i64; + ( + StoredEventEmitPackage { + tx_sequence_number, + event_sequence_number, + package: self.emit_package.to_vec(), + sender: self.sender.to_vec(), + }, + StoredEventEmitModule { + tx_sequence_number, + event_sequence_number, + package: self.emit_package.to_vec(), + module: self.emit_module.clone(), + sender: self.sender.to_vec(), + }, + StoredEventSenders { + tx_sequence_number, + event_sequence_number, + sender: self.sender.to_vec(), + }, + StoredEventStructPackage { + tx_sequence_number, + event_sequence_number, + package: self.type_package.to_vec(), + sender: self.sender.to_vec(), + }, + StoredEventStructModule { + tx_sequence_number, + event_sequence_number, + package: self.type_package.to_vec(), + module: self.type_module.clone(), + sender: self.sender.to_vec(), + }, + StoredEventStructName { + tx_sequence_number, + event_sequence_number, + package: self.type_package.to_vec(), + module: self.type_module.clone(), + type_name: self.type_name.clone(), + sender: self.sender.to_vec(), + }, + StoredEventStructInstantiation { + tx_sequence_number, + event_sequence_number, + package: self.type_package.to_vec(), + module: self.type_module.clone(), + type_instantiation: self.type_instantiation.clone(), + sender: self.sender.to_vec(), + }, + ) + } +} diff --git a/crates/sui-indexer/src/models/mod.rs b/crates/sui-indexer/src/models/mod.rs index 3b8233ec45021..b677e09f1aaad 100644 --- a/crates/sui-indexer/src/models/mod.rs +++ b/crates/sui-indexer/src/models/mod.rs @@ -4,7 +4,9 @@ pub mod checkpoints; pub mod display; pub mod epoch; +pub mod event_indices; pub mod events; +pub mod obj_indices; pub mod objects; pub mod packages; pub mod transactions; diff --git a/crates/sui-indexer/src/models/obj_indices.rs b/crates/sui-indexer/src/models/obj_indices.rs new file mode 100644 index 0000000000000..7e5298008834c --- /dev/null +++ b/crates/sui-indexer/src/models/obj_indices.rs @@ -0,0 +1,40 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use diesel::prelude::*; + +use crate::schema::objects_version; + +use super::objects::StoredDeletedObject; +use super::objects::StoredObject; + +/// Model types related to tables that support efficient execution of queries on the `objects`, +/// `objects_history` and `objects_snapshot` tables. + +#[derive(Queryable, Insertable, Debug, Identifiable, Clone, QueryableByName)] +#[diesel(table_name = objects_version, primary_key(object_id, object_version))] +pub struct StoredObjectVersion { + pub object_id: Vec, + pub object_version: i64, + pub cp_sequence_number: i64, +} + +impl From<&StoredObject> for StoredObjectVersion { + fn from(o: &StoredObject) -> Self { + Self { + object_id: o.object_id.clone(), + object_version: o.object_version, + cp_sequence_number: o.checkpoint_sequence_number, + } + } +} + +impl From<&StoredDeletedObject> for StoredObjectVersion { + fn from(o: &StoredDeletedObject) -> Self { + Self { + object_id: o.object_id.clone(), + object_version: o.object_version, + cp_sequence_number: o.checkpoint_sequence_number, + } + } +} diff --git a/crates/sui-indexer/src/models/packages.rs b/crates/sui-indexer/src/models/packages.rs index 63f61f01fc428..97c8e8fc5b459 100644 --- a/crates/sui-indexer/src/models/packages.rs +++ b/crates/sui-indexer/src/models/packages.rs @@ -10,14 +10,20 @@ use diesel::prelude::*; #[diesel(table_name = packages, primary_key(package_id))] pub struct StoredPackage { pub package_id: Vec, + pub original_id: Vec, + pub package_version: i64, pub move_package: Vec, + pub checkpoint_sequence_number: i64, } impl From for StoredPackage { fn from(p: IndexedPackage) -> Self { Self { package_id: p.package_id.to_vec(), + original_id: p.move_package.original_package_id().to_vec(), + package_version: p.move_package.version().value() as i64, move_package: bcs::to_bytes(&p.move_package).unwrap(), + checkpoint_sequence_number: p.checkpoint_sequence_number as i64, } } } diff --git a/crates/sui-indexer/src/models/tx_indices.rs b/crates/sui-indexer/src/models/tx_indices.rs index 1d6e1e386aec2..9868c1e83ba17 100644 --- a/crates/sui-indexer/src/models/tx_indices.rs +++ b/crates/sui-indexer/src/models/tx_indices.rs @@ -3,7 +3,8 @@ use crate::{ schema::{ - tx_calls, tx_changed_objects, tx_digests, tx_input_objects, tx_recipients, tx_senders, + tx_calls_fun, tx_calls_mod, tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, + tx_kinds, tx_recipients, tx_senders, }, types::TxIndex, }; @@ -24,7 +25,6 @@ pub struct TxDigest { #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_senders)] pub struct StoredTxSenders { - pub cp_sequence_number: i64, pub tx_sequence_number: i64, pub sender: Vec, } @@ -32,35 +32,52 @@ pub struct StoredTxSenders { #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_recipients)] pub struct StoredTxRecipients { - pub cp_sequence_number: i64, pub tx_sequence_number: i64, pub recipient: Vec, + pub sender: Vec, } #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_input_objects)] pub struct StoredTxInputObject { - pub cp_sequence_number: i64, pub tx_sequence_number: i64, pub object_id: Vec, + pub sender: Vec, } #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] #[diesel(table_name = tx_changed_objects)] pub struct StoredTxChangedObject { - pub cp_sequence_number: i64, pub tx_sequence_number: i64, pub object_id: Vec, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = tx_calls_pkg)] +pub struct StoredTxPkg { + pub tx_sequence_number: i64, + pub package: Vec, + pub sender: Vec, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = tx_calls_mod)] +pub struct StoredTxMod { + pub tx_sequence_number: i64, + pub package: Vec, + pub module: String, + pub sender: Vec, } #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] -#[diesel(table_name = tx_calls)] -pub struct StoredTxCalls { - pub cp_sequence_number: i64, +#[diesel(table_name = tx_calls_fun)] +pub struct StoredTxFun { pub tx_sequence_number: i64, pub package: Vec, pub module: String, pub func: String, + pub sender: Vec, } #[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] @@ -68,7 +85,13 @@ pub struct StoredTxCalls { pub struct StoredTxDigest { pub tx_digest: Vec, pub tx_sequence_number: i64, - pub cp_sequence_number: i64, +} + +#[derive(Queryable, Insertable, Selectable, Debug, Clone, Default)] +#[diesel(table_name = tx_kinds)] +pub struct StoredTxKind { + pub tx_kind: i16, + pub tx_sequence_number: i64, } #[allow(clippy::type_complexity)] @@ -80,71 +103,109 @@ impl TxIndex { Vec, Vec, Vec, - Vec, + Vec, + Vec, + Vec, Vec, + Vec, ) { let tx_sequence_number = self.tx_sequence_number as i64; - let cp_sequence_number = self.checkpoint_sequence_number as i64; - let tx_senders = self - .senders - .iter() - .map(|s| StoredTxSenders { - cp_sequence_number, - tx_sequence_number, - sender: s.to_vec(), - }) - .collect(); + let tx_sender = StoredTxSenders { + tx_sequence_number, + sender: self.sender.to_vec(), + }; let tx_recipients = self .recipients .iter() .map(|s| StoredTxRecipients { - cp_sequence_number, tx_sequence_number, recipient: s.to_vec(), + sender: self.sender.to_vec(), }) .collect(); let tx_input_objects = self .input_objects .iter() .map(|o| StoredTxInputObject { - cp_sequence_number, tx_sequence_number, object_id: bcs::to_bytes(&o).unwrap(), + sender: self.sender.to_vec(), }) .collect(); let tx_changed_objects = self .changed_objects .iter() .map(|o| StoredTxChangedObject { - cp_sequence_number, tx_sequence_number, object_id: bcs::to_bytes(&o).unwrap(), + sender: self.sender.to_vec(), }) .collect(); - let tx_calls = self + + let mut packages = Vec::new(); + let mut packages_modules = Vec::new(); + let mut packages_modules_funcs = Vec::new(); + + for (pkg, pkg_mod, pkg_mod_func) in self .move_calls .iter() - .map(|(p, m, f)| StoredTxCalls { - cp_sequence_number, + .map(|(p, m, f)| (*p, (*p, m.clone()), (*p, m.clone(), f.clone()))) + { + packages.push(pkg); + packages_modules.push(pkg_mod); + packages_modules_funcs.push(pkg_mod_func); + } + + let tx_pkgs = packages + .iter() + .map(|p| StoredTxPkg { + tx_sequence_number, + package: p.to_vec(), + sender: self.sender.to_vec(), + }) + .collect(); + + let tx_mods = packages_modules + .iter() + .map(|(p, m)| StoredTxMod { + tx_sequence_number, + package: p.to_vec(), + module: m.to_string(), + sender: self.sender.to_vec(), + }) + .collect(); + + let tx_calls = packages_modules_funcs + .iter() + .map(|(p, m, f)| StoredTxFun { tx_sequence_number, package: p.to_vec(), module: m.to_string(), func: f.to_string(), + sender: self.sender.to_vec(), }) .collect(); + let stored_tx_digest = StoredTxDigest { tx_digest: self.transaction_digest.into_inner().to_vec(), tx_sequence_number, - cp_sequence_number, + }; + + let tx_kind = StoredTxKind { + tx_kind: self.tx_kind as i16, + tx_sequence_number, }; ( - tx_senders, + vec![tx_sender], tx_recipients, tx_input_objects, tx_changed_objects, + tx_pkgs, + tx_mods, tx_calls, vec![stored_tx_digest], + vec![tx_kind], ) } } diff --git a/crates/sui-indexer/src/schema.rs b/crates/sui-indexer/src/schema.rs index 564f7cd721343..1ef8e0aed8e81 100644 --- a/crates/sui-indexer/src/schema.rs +++ b/crates/sui-indexer/src/schema.rs @@ -26,6 +26,8 @@ diesel::table! { checkpoint_commitments -> Bytea, validator_signature -> Bytea, end_of_epoch_data -> Nullable, + min_tx_sequence_number -> Nullable, + max_tx_sequence_number -> Nullable } } @@ -71,7 +73,75 @@ diesel::table! { } diesel::table! { - events (tx_sequence_number, event_sequence_number, checkpoint_sequence_number) { + event_emit_module (package, module, tx_sequence_number, event_sequence_number) { + package -> Bytea, + module -> Text, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_emit_package (package, tx_sequence_number, event_sequence_number) { + package -> Bytea, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_senders (sender, tx_sequence_number, event_sequence_number) { + sender -> Bytea, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + } +} + +diesel::table! { + event_struct_instantiation (package, module, type_instantiation, tx_sequence_number, event_sequence_number) { + package -> Bytea, + module -> Text, + type_instantiation -> Text, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_struct_module (package, module, tx_sequence_number, event_sequence_number) { + package -> Bytea, + module -> Text, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_struct_name (package, module, type_name, tx_sequence_number, event_sequence_number) { + package -> Bytea, + module -> Text, + type_name -> Text, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + event_struct_package (package, tx_sequence_number, event_sequence_number) { + package -> Bytea, + tx_sequence_number -> Int8, + event_sequence_number -> Int8, + sender -> Bytea, + } +} + +diesel::table! { + events (tx_sequence_number, event_sequence_number) { tx_sequence_number -> Int8, event_sequence_number -> Int8, transaction_digest -> Bytea, @@ -89,7 +159,7 @@ diesel::table! { } diesel::table! { - events_partition_0 (tx_sequence_number, event_sequence_number, checkpoint_sequence_number) { + events_partition_0 (tx_sequence_number, event_sequence_number) { tx_sequence_number -> Int8, event_sequence_number -> Int8, transaction_digest -> Bytea, @@ -198,14 +268,25 @@ diesel::table! { } diesel::table! { - packages (package_id) { + objects_version (object_id, object_version) { + object_id -> Bytea, + object_version -> Int8, + cp_sequence_number -> Int8, + } +} + +diesel::table! { + packages (package_id, original_id, package_version) { package_id -> Bytea, + original_id -> Bytea, + package_version -> Int8, move_package -> Bytea, + checkpoint_sequence_number -> Int8, } } diesel::table! { - transactions (tx_sequence_number, checkpoint_sequence_number) { + transactions (tx_sequence_number) { tx_sequence_number -> Int8, transaction_digest -> Bytea, raw_transaction -> Bytea, @@ -221,7 +302,7 @@ diesel::table! { } diesel::table! { - transactions_partition_0 (tx_sequence_number, checkpoint_sequence_number) { + transactions_partition_0 (tx_sequence_number) { tx_sequence_number -> Int8, transaction_digest -> Bytea, raw_transaction -> Bytea, @@ -237,50 +318,72 @@ diesel::table! { } diesel::table! { - tx_calls (package, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_calls_fun (package, module, func, tx_sequence_number) { tx_sequence_number -> Int8, package -> Bytea, module -> Text, func -> Text, + sender -> Bytea, } } diesel::table! { - tx_changed_objects (object_id, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_calls_mod (package, module, tx_sequence_number) { + tx_sequence_number -> Int8, + package -> Bytea, + module -> Text, + sender -> Bytea, + } +} + +diesel::table! { + tx_calls_pkg (package, tx_sequence_number) { + tx_sequence_number -> Int8, + package -> Bytea, + sender -> Bytea, + } +} + +diesel::table! { + tx_changed_objects (object_id, tx_sequence_number) { tx_sequence_number -> Int8, object_id -> Bytea, + sender -> Bytea, } } diesel::table! { tx_digests (tx_digest) { tx_digest -> Bytea, - cp_sequence_number -> Int8, tx_sequence_number -> Int8, } } diesel::table! { - tx_input_objects (object_id, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_input_objects (object_id, tx_sequence_number) { tx_sequence_number -> Int8, object_id -> Bytea, + sender -> Bytea, } } diesel::table! { - tx_recipients (recipient, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_kinds (tx_kind, tx_sequence_number) { + tx_sequence_number -> Int8, + tx_kind -> Int2, + } +} + +diesel::table! { + tx_recipients (recipient, tx_sequence_number) { tx_sequence_number -> Int8, recipient -> Bytea, + sender -> Bytea, } } diesel::table! { - tx_senders (sender, tx_sequence_number, cp_sequence_number) { - cp_sequence_number -> Int8, + tx_senders (sender, tx_sequence_number) { tx_sequence_number -> Int8, sender -> Bytea, } @@ -295,21 +398,33 @@ macro_rules! for_all_tables { pruner_cp_watermark, display, epochs, + event_emit_module, + event_emit_package, + event_senders, + event_struct_instantiation, + event_struct_module, + event_struct_name, + event_struct_package, events, objects, objects_history, objects_snapshot, + objects_version, packages, transactions, - tx_calls, + tx_calls_fun, + tx_calls_mod, + tx_calls_pkg, tx_changed_objects, tx_digests, tx_input_objects, + tx_kinds, tx_recipients, tx_senders ); }; } + pub use for_all_tables; for_all_tables!(diesel::allow_tables_to_appear_in_same_query); diff --git a/crates/sui-indexer/src/store/indexer_store.rs b/crates/sui-indexer/src/store/indexer_store.rs index 868fe31416a6c..97516929ffe24 100644 --- a/crates/sui-indexer/src/store/indexer_store.rs +++ b/crates/sui-indexer/src/store/indexer_store.rs @@ -10,7 +10,9 @@ use crate::errors::IndexerError; use crate::handlers::{EpochToCommit, TransactionObjectChangesToCommit}; use crate::models::display::StoredDisplay; use crate::models::objects::{StoredDeletedObject, StoredObject}; -use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex}; +use crate::types::{ + EventIndex, IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex, +}; #[allow(clippy::large_enum_variant)] pub enum ObjectChangeToCommit { @@ -63,6 +65,11 @@ pub trait IndexerStore: Any + Clone + Sync + Send + 'static { async fn persist_tx_indices(&self, indices: Vec) -> Result<(), IndexerError>; async fn persist_events(&self, events: Vec) -> Result<(), IndexerError>; + async fn persist_event_indices( + &self, + event_indices: Vec, + ) -> Result<(), IndexerError>; + async fn persist_displays( &self, display_updates: BTreeMap, diff --git a/crates/sui-indexer/src/store/mod.rs b/crates/sui-indexer/src/store/mod.rs index 245cf848bc9ff..2a86a7c4b73ee 100644 --- a/crates/sui-indexer/src/store/mod.rs +++ b/crates/sui-indexer/src/store/mod.rs @@ -279,6 +279,36 @@ pub mod diesel_macro { }}; } + #[macro_export] + macro_rules! persist_chunk_into_table { + ($table:expr, $chunk:expr, $pool:expr) => {{ + let now = std::time::Instant::now(); + let chunk_len = $chunk.len(); + transactional_blocking_with_retry!( + $pool, + |conn| { + for chunk in $chunk.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!($table, chunk, conn); + } + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + .tap_ok(|_| { + let elapsed = now.elapsed().as_secs_f64(); + info!( + elapsed, + "Persisted {} rows to {}", + chunk_len, + stringify!($table), + ); + }) + .tap_err(|e| { + tracing::error!("Failed to persist {} with error: {}", stringify!($table), e); + }) + }}; + } + pub use blocking_call_is_ok_or_panic; pub use read_only_blocking; pub use read_only_repeatable_blocking; diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index fad01bb61a998..5fe4277e7c41c 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -27,7 +27,6 @@ use crate::db::ConnectionPool; use crate::errors::{Context, IndexerError}; use crate::handlers::EpochToCommit; use crate::handlers::TransactionObjectChangesToCommit; -use crate::insert_or_ignore_into; use crate::metrics::IndexerMetrics; use crate::models::checkpoints::StoredChainIdentifier; use crate::models::checkpoints::StoredCheckpoint; @@ -35,6 +34,7 @@ use crate::models::checkpoints::StoredCpTx; use crate::models::display::StoredDisplay; use crate::models::epoch::StoredEpochInfo; use crate::models::events::StoredEvent; +use crate::models::obj_indices::StoredObjectVersion; use crate::models::objects::{ StoredDeletedHistoryObject, StoredDeletedObject, StoredHistoryObject, StoredObject, StoredObjectSnapshot, @@ -42,12 +42,18 @@ use crate::models::objects::{ use crate::models::packages::StoredPackage; use crate::models::transactions::StoredTransaction; use crate::schema::{ - chain_identifier, checkpoints, display, epochs, events, objects, objects_history, - objects_snapshot, packages, pruner_cp_watermark, transactions, tx_calls, tx_changed_objects, - tx_digests, tx_input_objects, tx_recipients, tx_senders, + chain_identifier, checkpoints, display, epochs, event_emit_module, event_emit_package, + event_senders, event_struct_instantiation, event_struct_module, event_struct_name, + event_struct_package, events, objects, objects_history, objects_snapshot, objects_version, + packages, pruner_cp_watermark, transactions, tx_calls_fun, tx_calls_mod, tx_calls_pkg, + tx_changed_objects, tx_digests, tx_input_objects, tx_kinds, tx_recipients, tx_senders, }; +use crate::types::EventIndex; use crate::types::{IndexedCheckpoint, IndexedEvent, IndexedPackage, IndexedTransaction, TxIndex}; -use crate::{read_only_blocking, transactional_blocking_with_retry}; +use crate::{ + insert_or_ignore_into, persist_chunk_into_table, read_only_blocking, + transactional_blocking_with_retry, +}; use super::pg_partition_manager::{EpochPartitionData, PgPartitionManager}; use super::IndexerStore; @@ -65,7 +71,7 @@ macro_rules! chunk { }}; } -macro_rules! prune_tx_indice_table { +macro_rules! prune_tx_or_event_indice_table { ($table:ident, $conn:expr, $min_tx:expr, $max_tx:expr, $context_msg:expr) => { diesel::delete($table::table.filter($table::tx_sequence_number.between($min_tx, $max_tx))) .execute($conn) @@ -439,6 +445,12 @@ impl PgIndexerStore { .eq(excluded(objects_snapshot::checkpoint_sequence_number)), objects_snapshot::owner_type.eq(excluded(objects_snapshot::owner_type)), objects_snapshot::owner_id.eq(excluded(objects_snapshot::owner_id)), + objects_snapshot::object_type_package + .eq(excluded(objects_snapshot::object_type_package)), + objects_snapshot::object_type_module + .eq(excluded(objects_snapshot::object_type_module)), + objects_snapshot::object_type_name + .eq(excluded(objects_snapshot::object_type_name)), objects_snapshot::object_type .eq(excluded(objects_snapshot::object_type)), objects_snapshot::serialized_object @@ -483,13 +495,16 @@ impl PgIndexerStore { .checkpoint_db_commit_latency_objects_history_chunks .start_timer(); let mut mutated_objects: Vec = vec![]; + let mut object_versions: Vec = vec![]; let mut deleted_object_ids: Vec = vec![]; for object in objects { match object { ObjectChangeToCommit::MutatedObject(stored_object) => { + object_versions.push(StoredObjectVersion::from(&stored_object)); mutated_objects.push(stored_object.into()); } ObjectChangeToCommit::DeletedObject(stored_deleted_object) => { + object_versions.push(StoredObjectVersion::from(&stored_deleted_object)); deleted_object_ids.push(stored_deleted_object.into()); } } @@ -509,6 +524,11 @@ impl PgIndexerStore { .context("Failed to write object mutations to objects_history in DB.")?; } + for object_version_chunk in object_versions.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + insert_or_ignore_into!(objects_version::table, object_version_chunk, conn); + } + for deleted_objects_chunk in deleted_object_ids.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { @@ -764,13 +784,144 @@ impl PgIndexerStore { }) } + async fn persist_event_indices_chunk( + &self, + indices: Vec, + ) -> Result<(), IndexerError> { + let guard = self + .metrics + .checkpoint_db_commit_latency_event_indices_chunks + .start_timer(); + let len = indices.len(); + let ( + event_emit_packages, + event_emit_modules, + event_senders, + event_struct_packages, + event_struct_modules, + event_struct_names, + event_struct_instantiations, + ) = indices.into_iter().map(|i| i.split()).fold( + ( + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), + ), + |( + mut event_emit_packages, + mut event_emit_modules, + mut event_senders, + mut event_struct_packages, + mut event_struct_modules, + mut event_struct_names, + mut event_struct_instantiations, + ), + index| { + event_emit_packages.push(index.0); + event_emit_modules.push(index.1); + event_senders.push(index.2); + event_struct_packages.push(index.3); + event_struct_modules.push(index.4); + event_struct_names.push(index.5); + event_struct_instantiations.push(index.6); + ( + event_emit_packages, + event_emit_modules, + event_senders, + event_struct_packages, + event_struct_modules, + event_struct_names, + event_struct_instantiations, + ) + }, + ); + + // Now persist all the event indices in parallel into their tables. + let mut futures = vec![]; + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_emit_package::table, + event_emit_packages, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_emit_module::table, + event_emit_modules, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!(event_senders::table, event_senders, &this.blocking_cp) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_struct_package::table, + event_struct_packages, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_struct_module::table, + event_struct_modules, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_struct_name::table, + event_struct_names, + &this.blocking_cp + ) + })); + + futures.push(self.spawn_blocking_task(move |this| { + persist_chunk_into_table!( + event_struct_instantiation::table, + event_struct_instantiations, + &this.blocking_cp + ) + })); + + futures::future::join_all(futures) + .await + .into_iter() + .collect::, _>>() + .map_err(|e| { + tracing::error!("Failed to join event indices futures in a chunk: {}", e); + IndexerError::from(e) + })? + .into_iter() + .collect::, _>>() + .map_err(|e| { + IndexerError::PostgresWriteError(format!( + "Failed to persist all event indices in a chunk: {:?}", + e + )) + })?; + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} chunked event indices", len); + Ok(()) + } + async fn persist_tx_indices_chunk(&self, indices: Vec) -> Result<(), IndexerError> { let guard = self .metrics .checkpoint_db_commit_latency_tx_indices_chunks .start_timer(); let len = indices.len(); - let (senders, recipients, input_objects, changed_objects, calls, digests) = + let (senders, recipients, input_objects, changed_objects, pkgs, mods, funs, digests, kinds) = indices.into_iter().map(|i| i.split()).fold( ( Vec::new(), @@ -779,29 +930,41 @@ impl PgIndexerStore { Vec::new(), Vec::new(), Vec::new(), + Vec::new(), + Vec::new(), + Vec::new(), ), |( mut tx_senders, mut tx_recipients, mut tx_input_objects, mut tx_changed_objects, - mut tx_calls, + mut tx_pkgs, + mut tx_mods, + mut tx_funs, mut tx_digests, + mut tx_kinds, ), index| { tx_senders.extend(index.0); tx_recipients.extend(index.1); tx_input_objects.extend(index.2); tx_changed_objects.extend(index.3); - tx_calls.extend(index.4); - tx_digests.extend(index.5); + tx_pkgs.extend(index.4); + tx_mods.extend(index.5); + tx_funs.extend(index.6); + tx_digests.extend(index.7); + tx_kinds.extend(index.8); ( tx_senders, tx_recipients, tx_input_objects, tx_changed_objects, - tx_calls, + tx_pkgs, + tx_mods, + tx_funs, tx_digests, + tx_kinds, ) }, ); @@ -910,19 +1073,15 @@ impl PgIndexerStore { tracing::error!("Failed to persist tx_changed_objects with error: {}", e); }) })); + futures.push(self.spawn_blocking_task(move |this| { let now = Instant::now(); - let calls_len = calls.len(); + let rows_len = pkgs.len(); transactional_blocking_with_retry!( &this.blocking_cp, |conn| { - for chunk in calls.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(tx_calls::table) - .values(chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write tx_calls chunk to PostgresDB")?; + for chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!(tx_calls_pkg::table, chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -930,12 +1089,60 @@ impl PgIndexerStore { ) .tap_ok(|_| { let elapsed = now.elapsed().as_secs_f64(); - info!(elapsed, "Persisted {} rows to tx_calls tables", calls_len); + info!( + elapsed, + "Persisted {} rows to tx_calls_pkg tables", rows_len + ); }) .tap_err(|e| { - tracing::error!("Failed to persist tx_calls with error: {}", e); + tracing::error!("Failed to persist tx_calls_pkg with error: {}", e); }) })); + + futures.push(self.spawn_blocking_task(move |this| { + let now = Instant::now(); + let rows_len = mods.len(); + transactional_blocking_with_retry!( + &this.blocking_cp, + |conn| { + for chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!(tx_calls_mod::table, chunk, conn); + } + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + .tap_ok(|_| { + let elapsed = now.elapsed().as_secs_f64(); + info!(elapsed, "Persisted {} rows to tx_calls_mod table", rows_len); + }) + .tap_err(|e| { + tracing::error!("Failed to persist tx_calls_mod with error: {}", e); + }) + })); + + futures.push(self.spawn_blocking_task(move |this| { + let now = Instant::now(); + let rows_len = funs.len(); + transactional_blocking_with_retry!( + &this.blocking_cp, + |conn| { + for chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!(tx_calls_fun::table, chunk, conn); + } + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + .tap_ok(|_| { + let elapsed = now.elapsed().as_secs_f64(); + info!(elapsed, "Persisted {} rows to tx_calls_fun table", rows_len); + }) + .tap_err(|e| { + tracing::error!("Failed to persist tx_calls_fun with error: {}", e); + }) + })); + futures.push(self.spawn_blocking_task(move |this| { let now = Instant::now(); let calls_len = digests.len(); @@ -943,12 +1150,7 @@ impl PgIndexerStore { &this.blocking_cp, |conn| { for chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { - diesel::insert_into(tx_digests::table) - .values(chunk) - .on_conflict_do_nothing() - .execute(conn) - .map_err(IndexerError::from) - .context("Failed to write tx_digests chunk to PostgresDB")?; + insert_or_ignore_into!(tx_digests::table, chunk, conn); } Ok::<(), IndexerError>(()) }, @@ -963,6 +1165,28 @@ impl PgIndexerStore { }) })); + futures.push(self.spawn_blocking_task(move |this| { + let now = Instant::now(); + let rows_len = kinds.len(); + transactional_blocking_with_retry!( + &this.blocking_cp, + |conn| { + for chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + insert_or_ignore_into!(tx_kinds::table, chunk, conn); + } + Ok::<(), IndexerError>(()) + }, + Duration::from_secs(60) + ) + .tap_ok(|_| { + let elapsed = now.elapsed().as_secs_f64(); + info!(elapsed, "Persisted {} rows to tx_kinds tables", rows_len); + }) + .tap_err(|e| { + tracing::error!("Failed to persist tx_kinds with error: {}", e); + }) + })); + futures::future::join_all(futures) .await .into_iter() @@ -990,12 +1214,35 @@ impl PgIndexerStore { .checkpoint_db_commit_latency_epoch .start_timer(); let epoch_id = epoch.new_epoch.epoch; + transactional_blocking_with_retry!( &self.blocking_cp, |conn| { if let Some(last_epoch) = &epoch.last_epoch { let last_epoch_id = last_epoch.epoch; - let last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); + // Overwrites the `epoch_total_transactions` field on `epoch.last_epoch` because + // we are not guaranteed to have the latest data in db when this is set on + // indexer's chain-reading side. However, when we `persist_epoch`, the + // checkpoints from an epoch ago must have been indexed. + let previous_epoch_network_total_transactions = match epoch_id { + 0 | 1 => 0, + _ => { + let prev_epoch_id = epoch_id - 2; + let result = checkpoints::table + .filter(checkpoints::epoch.eq(prev_epoch_id as i64)) + .select(max(checkpoints::network_total_transactions)) + .first::>(conn) + .map(|o| o.unwrap_or(0))?; + + result as u64 + } + }; + + let epoch_total_transactions = epoch.network_total_transactions + - previous_epoch_network_total_transactions; + + let mut last_epoch = StoredEpochInfo::from_epoch_end_info(last_epoch); + last_epoch.epoch_total_transactions = Some(epoch_total_transactions as i64); info!(last_epoch_id, "Persisting epoch end data: {:?}", last_epoch); diesel::insert_into(epochs::table) .values(last_epoch) @@ -1061,6 +1308,14 @@ impl PgIndexerStore { EpochPartitionData::compose_data(epoch_to_commit, last_epoch); let table_partitions = self.partition_manager.get_table_partitions()?; for (table, (first_partition, last_partition)) in table_partitions { + // Only advance epoch partition for epoch partitioned tables. + if !self + .partition_manager + .get_strategy(&table) + .is_epoch_partitioned() + { + continue; + } let guard = self.metrics.advance_epoch_latency.start_timer(); self.partition_manager.advance_and_prune_epoch_partition( table.clone(), @@ -1116,47 +1371,121 @@ impl PgIndexerStore { ) } + fn prune_event_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> { + let (min_tx, max_tx) = (min_tx as i64, max_tx as i64); + transactional_blocking_with_retry!( + &self.blocking_cp, + |conn| { + prune_tx_or_event_indice_table!( + event_emit_module, + conn, + min_tx, + max_tx, + "Failed to prune event_emit_module table" + ); + prune_tx_or_event_indice_table!( + event_emit_package, + conn, + min_tx, + max_tx, + "Failed to prune event_emit_package table" + ); + prune_tx_or_event_indice_table![ + event_senders, + conn, + min_tx, + max_tx, + "Failed to prune event_senders table" + ]; + prune_tx_or_event_indice_table![ + event_struct_instantiation, + conn, + min_tx, + max_tx, + "Failed to prune event_struct_instantiation table" + ]; + prune_tx_or_event_indice_table![ + event_struct_module, + conn, + min_tx, + max_tx, + "Failed to prune event_struct_module table" + ]; + prune_tx_or_event_indice_table![ + event_struct_name, + conn, + min_tx, + max_tx, + "Failed to prune event_struct_name table" + ]; + prune_tx_or_event_indice_table![ + event_struct_package, + conn, + min_tx, + max_tx, + "Failed to prune event_struct_package table" + ]; + Ok::<(), IndexerError>(()) + }, + PG_DB_COMMIT_SLEEP_DURATION + ) + } + fn prune_tx_indices_table(&self, min_tx: u64, max_tx: u64) -> Result<(), IndexerError> { let (min_tx, max_tx) = (min_tx as i64, max_tx as i64); transactional_blocking_with_retry!( &self.blocking_cp, |conn| { - prune_tx_indice_table!( + prune_tx_or_event_indice_table!( tx_senders, conn, min_tx, max_tx, "Failed to prune tx_senders table" ); - prune_tx_indice_table!( + prune_tx_or_event_indice_table!( tx_recipients, conn, min_tx, max_tx, "Failed to prune tx_recipients table" ); - prune_tx_indice_table![ + prune_tx_or_event_indice_table![ tx_input_objects, conn, min_tx, max_tx, "Failed to prune tx_input_objects table" ]; - prune_tx_indice_table![ + prune_tx_or_event_indice_table![ tx_changed_objects, conn, min_tx, max_tx, "Failed to prune tx_changed_objects table" ]; - prune_tx_indice_table![ - tx_calls, + prune_tx_or_event_indice_table![ + tx_calls_pkg, + conn, + min_tx, + max_tx, + "Failed to prune tx_calls_pkg table" + ]; + prune_tx_or_event_indice_table![ + tx_calls_mod, + conn, + min_tx, + max_tx, + "Failed to prune tx_calls_mod table" + ]; + prune_tx_or_event_indice_table![ + tx_calls_fun, conn, min_tx, max_tx, - "Failed to prune tx_calls table" + "Failed to prune tx_calls_fun table" ]; - prune_tx_indice_table![ + prune_tx_or_event_indice_table![ tx_digests, conn, min_tx, @@ -1590,6 +1919,46 @@ impl IndexerStore for PgIndexerStore { .await } + async fn persist_event_indices(&self, indices: Vec) -> Result<(), IndexerError> { + if indices.is_empty() { + return Ok(()); + } + let len = indices.len(); + let guard = self + .metrics + .checkpoint_db_commit_latency_event_indices + .start_timer(); + let chunks = chunk!(indices, self.config.parallel_chunk_size); + + let futures = chunks + .into_iter() + .map(|chunk| { + self.spawn_task(move |this: Self| async move { + this.persist_event_indices_chunk(chunk).await + }) + }) + .collect::>(); + futures::future::join_all(futures) + .await + .into_iter() + .collect::, _>>() + .map_err(|e| { + tracing::error!("Failed to join persist_event_indices_chunk futures: {}", e); + IndexerError::from(e) + })? + .into_iter() + .collect::, _>>() + .map_err(|e| { + IndexerError::PostgresWriteError(format!( + "Failed to persist all event_indices chunks: {:?}", + e + )) + })?; + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} event_indices chunks", len); + Ok(()) + } + async fn persist_tx_indices(&self, indices: Vec) -> Result<(), IndexerError> { if indices.is_empty() { return Ok(()); @@ -1685,6 +2054,21 @@ impl IndexerStore for PgIndexerStore { "Pruned transactions for checkpoint {} from tx {} to tx {}", cp, min_tx, max_tx ); + self.execute_in_blocking_worker(move |this| { + this.prune_event_indices_table(min_tx, max_tx) + }) + .await + .unwrap_or_else(|e| { + tracing::error!( + "Failed to prune events of transactions for cp {}: {}", + cp, + e + ); + }); + info!( + "Pruned events of transactions for checkpoint {} from tx {} to tx {}", + cp, min_tx, max_tx + ); self.metrics.last_pruned_transaction.set(max_tx as i64); self.execute_in_blocking_worker(move |this| this.prune_cp_tx_table(cp)) diff --git a/crates/sui-indexer/src/store/pg_partition_manager.rs b/crates/sui-indexer/src/store/pg_partition_manager.rs index 452c2f6f3812b..e8d3696fa2317 100644 --- a/crates/sui-indexer/src/store/pg_partition_manager.rs +++ b/crates/sui-indexer/src/store/pg_partition_manager.rs @@ -4,7 +4,7 @@ use diesel::r2d2::R2D2Connection; use diesel::sql_types::{BigInt, VarChar}; use diesel::{QueryableByName, RunQueryDsl}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::time::Duration; use tracing::{error, info}; @@ -30,22 +30,42 @@ GROUP BY table_name; pub struct PgPartitionManager { cp: ConnectionPool, + partition_strategies: HashMap<&'static str, PgPartitionStrategy>, } impl Clone for PgPartitionManager { fn clone(&self) -> PgPartitionManager { Self { cp: self.cp.clone(), + partition_strategies: self.partition_strategies.clone(), } } } +#[derive(Clone, Copy)] +pub enum PgPartitionStrategy { + CheckpointSequenceNumber, + TxSequenceNumber, + ObjectId, +} + +impl PgPartitionStrategy { + pub fn is_epoch_partitioned(&self) -> bool { + matches!( + self, + Self::CheckpointSequenceNumber | Self::TxSequenceNumber + ) + } +} + #[derive(Clone, Debug)] pub struct EpochPartitionData { last_epoch: u64, next_epoch: u64, last_epoch_start_cp: u64, next_epoch_start_cp: u64, + last_epoch_start_tx: u64, + next_epoch_start_tx: u64, } impl EpochPartitionData { @@ -54,18 +74,35 @@ impl EpochPartitionData { let last_epoch_start_cp = last_db_epoch.first_checkpoint_id as u64; let next_epoch = epoch.new_epoch.epoch; let next_epoch_start_cp = epoch.new_epoch.first_checkpoint_id; + + // Determining the tx_sequence_number range for the epoch partition differs from the + // checkpoint_sequence_number range, because the former is a sum of total transactions - + // this sum already addresses the off-by-one. + let next_epoch_start_tx = epoch.network_total_transactions; + let last_epoch_start_tx = + next_epoch_start_tx - last_db_epoch.epoch_total_transactions.unwrap() as u64; + Self { last_epoch, next_epoch, last_epoch_start_cp, next_epoch_start_cp, + last_epoch_start_tx, + next_epoch_start_tx, } } } impl PgPartitionManager { pub fn new(cp: ConnectionPool) -> Result { - let manager = Self { cp }; + let mut partition_strategies = HashMap::new(); + partition_strategies.insert("events", PgPartitionStrategy::TxSequenceNumber); + partition_strategies.insert("transactions", PgPartitionStrategy::TxSequenceNumber); + partition_strategies.insert("objects_version", PgPartitionStrategy::ObjectId); + let manager = Self { + cp, + partition_strategies, + }; let tables = manager.get_table_partitions()?; info!( "Found {} tables with partitions : [{:?}]", @@ -102,6 +139,32 @@ impl PgPartitionManager { ) } + /// Tries to fetch the partitioning strategy for the given partitioned table. Defaults to + /// `CheckpointSequenceNumber` as the majority of our tables are partitioned on an epoch's + /// checkpoints today. + pub fn get_strategy(&self, table_name: &str) -> PgPartitionStrategy { + self.partition_strategies + .get(table_name) + .copied() + .unwrap_or(PgPartitionStrategy::CheckpointSequenceNumber) + } + + pub fn determine_epoch_partition_range( + &self, + table_name: &str, + data: &EpochPartitionData, + ) -> Option<(u64, u64)> { + match self.get_strategy(table_name) { + PgPartitionStrategy::CheckpointSequenceNumber => { + Some((data.last_epoch_start_cp, data.next_epoch_start_cp)) + } + PgPartitionStrategy::TxSequenceNumber => { + Some((data.last_epoch_start_tx, data.next_epoch_start_tx)) + } + PgPartitionStrategy::ObjectId => None, + } + } + pub fn advance_and_prune_epoch_partition( &self, table: String, @@ -110,6 +173,9 @@ impl PgPartitionManager { data: &EpochPartitionData, epochs_to_keep: Option, ) -> Result<(), IndexerError> { + let Some(partition_range) = self.determine_epoch_partition_range(&table, data) else { + return Ok(()); + }; if data.next_epoch == 0 { tracing::info!("Epoch 0 partition has been created in the initial setup."); return Ok(()); @@ -123,16 +189,16 @@ impl PgPartitionManager { .bind::(table.clone()) .bind::(data.last_epoch as i64) .bind::(data.next_epoch as i64) - .bind::(data.last_epoch_start_cp as i64) - .bind::(data.next_epoch_start_cp as i64), + .bind::(partition_range.0 as i64) + .bind::(partition_range.1 as i64), conn, ) }, Duration::from_secs(10) )?; info!( - "Advanced epoch partition for table {} from {} to {}", - table, last_partition, data.next_epoch + "Advanced epoch partition for table {} from {} to {}, prev partition upper bound {}", + table, last_partition, data.next_epoch, partition_range.0 ); // prune old partitions beyond the retention period diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index 7eb2c0071ce7f..b3fd4dabddac5 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -89,6 +89,8 @@ impl IndexedCheckpoint { } } +/// Represents system state and summary info at the start and end of an epoch. Optional fields are +/// populated at epoch boundary, since they cannot be determined at the start of the epoch. #[derive(Clone, Debug, Default)] pub struct IndexedEpochInfo { pub epoch: u64, @@ -134,6 +136,9 @@ impl IndexedEpochInfo { } } + /// Creates `IndexedEpochInfo` for epoch X-1 at the boundary of epoch X-1 to X. + /// `network_total_tx_num_at_last_epoch_end` is needed to determine the number of transactions + /// that occurred in the epoch X-1. pub fn from_end_of_epoch_data( system_state_summary: &SuiSystemStateSummary, last_checkpoint_summary: &CertifiedCheckpointSummary, @@ -217,6 +222,47 @@ impl IndexedEvent { } } +#[derive(Debug, Clone)] +pub struct EventIndex { + pub tx_sequence_number: u64, + pub event_sequence_number: u64, + pub sender: SuiAddress, + pub emit_package: ObjectID, + pub emit_module: String, + pub type_package: ObjectID, + pub type_module: String, + /// Struct name of the event, without type parameters. + pub type_name: String, + /// Type instantiation of the event, with type name and type parameters, if any. + pub type_instantiation: String, +} + +impl EventIndex { + pub fn from_event( + tx_sequence_number: u64, + event_sequence_number: u64, + event: &sui_types::event::Event, + ) -> Self { + let type_instantiation = event + .type_ + .to_canonical_string(/* with_prefix */ true) + .splitn(3, "::") + .collect::>()[2] + .to_string(); + Self { + tx_sequence_number, + event_sequence_number, + sender: event.sender, + emit_package: event.package_id, + emit_module: event.transaction_module.to_string(), + type_package: event.type_.address.into(), + type_module: event.type_.module.to_string(), + type_name: event.type_.name.to_string(), + type_instantiation, + } + } +} + #[derive(Debug, Copy, Clone)] pub enum OwnerType { Immutable = 0, @@ -363,12 +409,13 @@ pub struct IndexedTransaction { #[derive(Debug, Clone)] pub struct TxIndex { pub tx_sequence_number: u64, + pub tx_kind: TransactionKind, pub transaction_digest: TransactionDigest, pub checkpoint_sequence_number: u64, pub input_objects: Vec, pub changed_objects: Vec, pub payers: Vec, - pub senders: Vec, + pub sender: SuiAddress, pub recipients: Vec, pub move_calls: Vec<(ObjectID, String, String)>, }