From b9ab18d103a2e68323f194507333c0451bbd1888 Mon Sep 17 00:00:00 2001 From: Ge Gao <106119108+gegaowp@users.noreply.github.com> Date: Fri, 11 Oct 2024 11:40:54 -0400 Subject: [PATCH] indexer fix: chunk to avoid PG parameter limit (#19754) (#19790) ## Description - fixed a bug that caused mainnet indexer to stop, also got reported in https://github.com/MystenLabs/sui/issues/19542, specifically here when tx has many input objects / affected objects / recipients / affected addresses etc. the expanded query will exceed the PG parameter limit of 65535 - also added ingestion tests for big tx indices & event indices and better error tracing ## Test plan added ingestion tests for tx and event indices --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: ## Description Describe the changes or additions included in this PR. ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- Cargo.lock | 1 + crates/sui-indexer/Cargo.toml | 1 + .../sui-indexer/src/store/pg_indexer_store.rs | 253 +++++++++++------- crates/sui-indexer/src/types.rs | 57 +++- crates/sui-indexer/tests/ingestion_tests.rs | 39 +++ 5 files changed, 253 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 879e5e71bdb3b..f3391891f8d88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13632,6 +13632,7 @@ dependencies = [ "ntest", "object_store", "prometheus", + "rand 0.8.5", "rayon", "regex", "serde", diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index ce14401ca9207..9e67a57fb00c4 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow.workspace = true +rand = "0.8.5" async-trait.workspace = true axum.workspace = true backoff.workspace = true diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 0c9438b01ced1..742180b88b3cf 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -905,48 +905,73 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { - diesel::insert_into(event_emit_package::table) - .values(&event_emit_packages) - .on_conflict_do_nothing() - .execute(conn) - .await?; - - diesel::insert_into(event_emit_module::table) - .values(&event_emit_modules) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_emit_packages_chunk in + event_emit_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_emit_package::table) + .values(event_emit_packages_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_senders::table) - .values(&event_senders) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_emit_modules_chunk in + event_emit_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_emit_module::table) + .values(event_emit_modules_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_package::table) - .values(&event_struct_packages) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_senders_chunk in event_senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(event_senders::table) + .values(event_senders_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_module::table) - .values(&event_struct_modules) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_packages_chunk in + event_struct_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_package::table) + .values(event_struct_packages_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_name::table) - .values(&event_struct_names) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_modules_chunk in + event_struct_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_module::table) + .values(event_struct_modules_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_instantiation::table) - .values(&event_struct_instantiations) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_names_chunk in + event_struct_names.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_name::table) + .values(event_struct_names_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } + for event_struct_instantiations_chunk in + event_struct_instantiations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_instantiation::table) + .values(event_struct_instantiations_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } Ok(()) } .scope_boxed() @@ -1035,71 +1060,99 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { - diesel::insert_into(tx_affected_addresses::table) - .values(&affected_addresses) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for affected_addresses_chunk in + affected_addresses.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_affected_addresses::table) + .values(affected_addresses_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_affected_objects::table) - .values(&affected_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for affected_objects_chunk in + affected_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_affected_objects::table) + .values(affected_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_senders::table) - .values(&senders) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for senders_chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_senders::table) + .values(senders_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_recipients::table) - .values(&recipients) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for recipients_chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_recipients::table) + .values(recipients_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_input_objects::table) - .values(&input_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for input_objects_chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_input_objects::table) + .values(input_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_changed_objects::table) - .values(&changed_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for changed_objects_chunk in + changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_changed_objects::table) + .values(changed_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_pkg::table) - .values(&pkgs) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for pkgs_chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_pkg::table) + .values(pkgs_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_mod::table) - .values(&mods) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for mods_chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_mod::table) + .values(mods_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_fun::table) - .values(&funs) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for funs_chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_fun::table) + .values(funs_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_digests::table) - .values(&digests) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for digests_chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_digests::table) + .values(digests_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_kinds::table) - .values(&kinds) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for kinds_chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_kinds::table) + .values(kinds_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } Ok(()) } @@ -1826,9 +1879,12 @@ impl IndexerStore for PgIndexerStore { "Failed to persist all event_indices chunks: {:?}", e )) - })?; - let elapsed = guard.stop_and_record(); - info!(elapsed, "Persisted {} event_indices chunks", len); + }) + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} event_indices chunks", len); + }) + .tap_err(|e| tracing::error!("Failed to persist all event_indices chunks: {:?}", e))?; Ok(()) } @@ -1856,9 +1912,12 @@ impl IndexerStore for PgIndexerStore { "Failed to persist all tx_indices chunks: {:?}", e )) - })?; - let elapsed = guard.stop_and_record(); - info!(elapsed, "Persisted {} tx_indices chunks", len); + }) + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} tx_indices chunks", len); + }) + .tap_err(|e| tracing::error!("Failed to persist all tx_indices chunks: {:?}", e))?; Ok(()) } diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index 33bca14214125..5c2c042dde803 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -1,8 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::errors::IndexerError; use move_core_types::language_storage::StructTag; +use rand::Rng; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use sui_json_rpc_types::{ @@ -25,6 +25,8 @@ use sui_types::sui_serde::SuiStructTag; use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; use sui_types::transaction::SenderSignedData; +use crate::errors::IndexerError; + pub type IndexerResult = Result; #[derive(Debug, Default)] @@ -254,6 +256,24 @@ pub struct EventIndex { pub type_instantiation: String, } +// for ingestion test +impl EventIndex { + pub fn random() -> Self { + let mut rng = rand::thread_rng(); + EventIndex { + tx_sequence_number: rng.gen(), + event_sequence_number: rng.gen(), + sender: SuiAddress::random_for_testing_only(), + emit_package: ObjectID::random(), + emit_module: rng.gen::().to_string(), + type_package: ObjectID::random(), + type_module: rng.gen::().to_string(), + type_name: rng.gen::().to_string(), + type_instantiation: rng.gen::().to_string(), + } + } +} + impl EventIndex { pub fn from_event( tx_sequence_number: u64, @@ -414,6 +434,41 @@ pub struct TxIndex { pub move_calls: Vec<(ObjectID, String, String)>, } +impl TxIndex { + pub fn random() -> Self { + let mut rng = rand::thread_rng(); + TxIndex { + tx_sequence_number: rng.gen(), + tx_kind: if rng.gen_bool(0.5) { + TransactionKind::SystemTransaction + } else { + TransactionKind::ProgrammableTransaction + }, + transaction_digest: TransactionDigest::random(), + checkpoint_sequence_number: rng.gen(), + input_objects: (0..1000).map(|_| ObjectID::random()).collect(), + changed_objects: (0..1000).map(|_| ObjectID::random()).collect(), + affected_objects: (0..1000).map(|_| ObjectID::random()).collect(), + payers: (0..rng.gen_range(0..100)) + .map(|_| SuiAddress::random_for_testing_only()) + .collect(), + sender: SuiAddress::random_for_testing_only(), + recipients: (0..rng.gen_range(0..1000)) + .map(|_| SuiAddress::random_for_testing_only()) + .collect(), + move_calls: (0..rng.gen_range(0..1000)) + .map(|_| { + ( + ObjectID::random(), + rng.gen::().to_string(), + rng.gen::().to_string(), + ) + }) + .collect(), + } + } +} + // ObjectChange is not bcs deserializable, IndexedObjectChange is. #[serde_as] #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] diff --git a/crates/sui-indexer/tests/ingestion_tests.rs b/crates/sui-indexer/tests/ingestion_tests.rs index 57eaaa7286d5a..c0a69a0862263 100644 --- a/crates/sui-indexer/tests/ingestion_tests.rs +++ b/crates/sui-indexer/tests/ingestion_tests.rs @@ -11,7 +11,10 @@ use sui_indexer::models::{ objects::StoredObject, objects::StoredObjectSnapshot, transactions::StoredTransaction, }; use sui_indexer::schema::{objects, objects_snapshot, transactions}; +use sui_indexer::store::indexer_store::IndexerStore; use sui_indexer::test_utils::{set_up, wait_for_checkpoint, wait_for_objects_snapshot}; +use sui_indexer::types::EventIndex; +use sui_indexer::types::TxIndex; use sui_types::base_types::SuiAddress; use sui_types::effects::TransactionEffectsAPI; use sui_types::gas_coin::GasCoin; @@ -174,3 +177,39 @@ pub async fn test_objects_snapshot() -> Result<(), IndexerError> { assert_eq!(snapshot_object.owner_id, Some(gas_owner_id.to_vec())); Ok(()) } + +// test insert large batch of tx_indices +#[tokio::test] +pub async fn test_insert_large_batch_tx_indices() -> Result<(), IndexerError> { + let tempdir = tempdir().unwrap(); + let mut sim = Simulacrum::new(); + let data_ingestion_path = tempdir.path().to_path_buf(); + sim.set_data_ingestion_path(data_ingestion_path.clone()); + + let (_, pg_store, _, _database) = set_up(Arc::new(sim), data_ingestion_path).await; + + let mut v = Vec::new(); + for _ in 0..1000 { + v.push(TxIndex::random()); + } + pg_store.persist_tx_indices(v).await?; + Ok(()) +} + +// test insert large batch of event_indices +#[tokio::test] +pub async fn test_insert_large_batch_event_indices() -> Result<(), IndexerError> { + let tempdir = tempdir().unwrap(); + let mut sim = Simulacrum::new(); + let data_ingestion_path = tempdir.path().to_path_buf(); + sim.set_data_ingestion_path(data_ingestion_path.clone()); + + let (_, pg_store, _, _database) = set_up(Arc::new(sim), data_ingestion_path).await; + + let mut v = Vec::new(); + for _ in 0..1000 { + v.push(EventIndex::random()); + } + pg_store.persist_event_indices(v).await?; + Ok(()) +}