diff --git a/Cargo.lock b/Cargo.lock index 37e8e694e1600..b28d1deea3b98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13638,6 +13638,7 @@ dependencies = [ "ntest", "object_store", "prometheus", + "rand 0.8.5", "rayon", "regex", "serde", diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index 573a50a6f0a9a..0d6acba5a9924 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow.workspace = true +rand = "0.8.5" async-trait.workspace = true axum.workspace = true backoff.workspace = true diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 13d6f63c4d946..72dfa38946a0a 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -935,48 +935,73 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { - diesel::insert_into(event_emit_package::table) - .values(&event_emit_packages) - .on_conflict_do_nothing() - .execute(conn) - .await?; - - diesel::insert_into(event_emit_module::table) - .values(&event_emit_modules) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_emit_packages_chunk in + event_emit_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_emit_package::table) + .values(event_emit_packages_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_senders::table) - .values(&event_senders) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_emit_modules_chunk in + event_emit_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_emit_module::table) + .values(event_emit_modules_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_package::table) - .values(&event_struct_packages) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_senders_chunk in event_senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(event_senders::table) + .values(event_senders_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_module::table) - .values(&event_struct_modules) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_packages_chunk in + event_struct_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_package::table) + .values(event_struct_packages_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_name::table) - .values(&event_struct_names) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_modules_chunk in + event_struct_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_module::table) + .values(event_struct_modules_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_instantiation::table) - .values(&event_struct_instantiations) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_names_chunk in + event_struct_names.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_name::table) + .values(event_struct_names_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } + for event_struct_instantiations_chunk in + event_struct_instantiations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_instantiation::table) + .values(event_struct_instantiations_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } Ok(()) } .scope_boxed() @@ -1065,71 +1090,99 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { - diesel::insert_into(tx_affected_addresses::table) - .values(&affected_addresses) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for affected_addresses_chunk in + affected_addresses.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_affected_addresses::table) + .values(affected_addresses_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_affected_objects::table) - .values(&affected_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for affected_objects_chunk in + affected_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_affected_objects::table) + .values(affected_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_senders::table) - .values(&senders) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for senders_chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_senders::table) + .values(senders_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_recipients::table) - .values(&recipients) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for recipients_chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_recipients::table) + .values(recipients_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_input_objects::table) - .values(&input_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for input_objects_chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_input_objects::table) + .values(input_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_changed_objects::table) - .values(&changed_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for changed_objects_chunk in + changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_changed_objects::table) + .values(changed_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_pkg::table) - .values(&pkgs) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for pkgs_chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_pkg::table) + .values(pkgs_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_mod::table) - .values(&mods) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for mods_chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_mod::table) + .values(mods_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_fun::table) - .values(&funs) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for funs_chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_fun::table) + .values(funs_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_digests::table) - .values(&digests) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for digests_chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_digests::table) + .values(digests_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_kinds::table) - .values(&kinds) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for kinds_chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_kinds::table) + .values(kinds_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } Ok(()) } @@ -1872,9 +1925,12 @@ impl IndexerStore for PgIndexerStore { "Failed to persist all event_indices chunks: {:?}", e )) - })?; - let elapsed = guard.stop_and_record(); - info!(elapsed, "Persisted {} event_indices chunks", len); + }) + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} event_indices chunks", len); + }) + .tap_err(|e| tracing::error!("Failed to persist all event_indices chunks: {:?}", e))?; Ok(()) } @@ -1902,9 +1958,12 @@ impl IndexerStore for PgIndexerStore { "Failed to persist all tx_indices chunks: {:?}", e )) - })?; - let elapsed = guard.stop_and_record(); - info!(elapsed, "Persisted {} tx_indices chunks", len); + }) + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} tx_indices chunks", len); + }) + .tap_err(|e| tracing::error!("Failed to persist all tx_indices chunks: {:?}", e))?; Ok(()) } diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index ee2ec93410d4c..81ce3bbb8de5f 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -1,8 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::errors::IndexerError; use move_core_types::language_storage::StructTag; +use rand::Rng; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use sui_json_rpc_types::{ @@ -25,6 +25,8 @@ use sui_types::sui_serde::SuiStructTag; use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; use sui_types::transaction::SenderSignedData; +use crate::errors::IndexerError; + pub type IndexerResult = Result; #[derive(Debug, Default)] @@ -248,6 +250,24 @@ pub struct EventIndex { pub type_instantiation: String, } +// for ingestion test +impl EventIndex { + pub fn random() -> Self { + let mut rng = rand::thread_rng(); + EventIndex { + tx_sequence_number: rng.gen(), + event_sequence_number: rng.gen(), + sender: SuiAddress::random_for_testing_only(), + emit_package: ObjectID::random(), + emit_module: rng.gen::().to_string(), + type_package: ObjectID::random(), + type_module: rng.gen::().to_string(), + type_name: rng.gen::().to_string(), + type_instantiation: rng.gen::().to_string(), + } + } +} + impl EventIndex { pub fn from_event( tx_sequence_number: u64, @@ -408,6 +428,41 @@ pub struct TxIndex { pub move_calls: Vec<(ObjectID, String, String)>, } +impl TxIndex { + pub fn random() -> Self { + let mut rng = rand::thread_rng(); + TxIndex { + tx_sequence_number: rng.gen(), + tx_kind: if rng.gen_bool(0.5) { + TransactionKind::SystemTransaction + } else { + TransactionKind::ProgrammableTransaction + }, + transaction_digest: TransactionDigest::random(), + checkpoint_sequence_number: rng.gen(), + input_objects: (0..1000).map(|_| ObjectID::random()).collect(), + changed_objects: (0..1000).map(|_| ObjectID::random()).collect(), + affected_objects: (0..1000).map(|_| ObjectID::random()).collect(), + payers: (0..rng.gen_range(0..100)) + .map(|_| SuiAddress::random_for_testing_only()) + .collect(), + sender: SuiAddress::random_for_testing_only(), + recipients: (0..rng.gen_range(0..1000)) + .map(|_| SuiAddress::random_for_testing_only()) + .collect(), + move_calls: (0..rng.gen_range(0..1000)) + .map(|_| { + ( + ObjectID::random(), + rng.gen::().to_string(), + rng.gen::().to_string(), + ) + }) + .collect(), + } + } +} + // ObjectChange is not bcs deserializable, IndexedObjectChange is. #[serde_as] #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]