From db7e232fbdd26d6dd79f19f1cb8557d50a3a21d1 Mon Sep 17 00:00:00 2001 From: Ge Gao <106119108+gegaowp@users.noreply.github.com> Date: Fri, 11 Oct 2024 10:47:07 -0400 Subject: [PATCH] indexer fix: chunk to avoid PG parameter limit (#19754) (#19789) ## Description cherry-pick pr ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- Cargo.lock | 1 + crates/sui-indexer/Cargo.toml | 1 + .../sui-indexer/src/store/pg_indexer_store.rs | 253 +++++++++++------- crates/sui-indexer/src/types.rs | 57 +++- 4 files changed, 214 insertions(+), 98 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37e8e694e1600..b28d1deea3b98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13638,6 +13638,7 @@ dependencies = [ "ntest", "object_store", "prometheus", + "rand 0.8.5", "rayon", "regex", "serde", diff --git a/crates/sui-indexer/Cargo.toml b/crates/sui-indexer/Cargo.toml index 573a50a6f0a9a..0d6acba5a9924 100644 --- a/crates/sui-indexer/Cargo.toml +++ b/crates/sui-indexer/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow.workspace = true +rand = "0.8.5" async-trait.workspace = true axum.workspace = true backoff.workspace = true diff --git a/crates/sui-indexer/src/store/pg_indexer_store.rs b/crates/sui-indexer/src/store/pg_indexer_store.rs index 13d6f63c4d946..72dfa38946a0a 100644 --- a/crates/sui-indexer/src/store/pg_indexer_store.rs +++ b/crates/sui-indexer/src/store/pg_indexer_store.rs @@ -935,48 +935,73 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { - diesel::insert_into(event_emit_package::table) - .values(&event_emit_packages) - .on_conflict_do_nothing() - .execute(conn) - .await?; - - diesel::insert_into(event_emit_module::table) - .values(&event_emit_modules) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_emit_packages_chunk in + event_emit_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_emit_package::table) + .values(event_emit_packages_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_senders::table) - .values(&event_senders) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_emit_modules_chunk in + event_emit_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_emit_module::table) + .values(event_emit_modules_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_package::table) - .values(&event_struct_packages) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_senders_chunk in event_senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(event_senders::table) + .values(event_senders_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_module::table) - .values(&event_struct_modules) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_packages_chunk in + event_struct_packages.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_package::table) + .values(event_struct_packages_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_name::table) - .values(&event_struct_names) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_modules_chunk in + event_struct_modules.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_module::table) + .values(event_struct_modules_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(event_struct_instantiation::table) - .values(&event_struct_instantiations) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for event_struct_names_chunk in + event_struct_names.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_name::table) + .values(event_struct_names_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } + for event_struct_instantiations_chunk in + event_struct_instantiations.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(event_struct_instantiation::table) + .values(event_struct_instantiations_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } Ok(()) } .scope_boxed() @@ -1065,71 +1090,99 @@ impl PgIndexerStore { transaction_with_retry(&self.pool, PG_DB_COMMIT_SLEEP_DURATION, |conn| { async { - diesel::insert_into(tx_affected_addresses::table) - .values(&affected_addresses) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for affected_addresses_chunk in + affected_addresses.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_affected_addresses::table) + .values(affected_addresses_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_affected_objects::table) - .values(&affected_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for affected_objects_chunk in + affected_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_affected_objects::table) + .values(affected_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_senders::table) - .values(&senders) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for senders_chunk in senders.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_senders::table) + .values(senders_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_recipients::table) - .values(&recipients) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for recipients_chunk in recipients.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_recipients::table) + .values(recipients_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_input_objects::table) - .values(&input_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for input_objects_chunk in input_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_input_objects::table) + .values(input_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_changed_objects::table) - .values(&changed_objects) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for changed_objects_chunk in + changed_objects.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) + { + diesel::insert_into(tx_changed_objects::table) + .values(changed_objects_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_pkg::table) - .values(&pkgs) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for pkgs_chunk in pkgs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_pkg::table) + .values(pkgs_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_mod::table) - .values(&mods) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for mods_chunk in mods.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_mod::table) + .values(mods_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_calls_fun::table) - .values(&funs) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for funs_chunk in funs.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_calls_fun::table) + .values(funs_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_digests::table) - .values(&digests) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for digests_chunk in digests.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_digests::table) + .values(digests_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } - diesel::insert_into(tx_kinds::table) - .values(&kinds) - .on_conflict_do_nothing() - .execute(conn) - .await?; + for kinds_chunk in kinds.chunks(PG_COMMIT_CHUNK_SIZE_INTRA_DB_TX) { + diesel::insert_into(tx_kinds::table) + .values(kinds_chunk) + .on_conflict_do_nothing() + .execute(conn) + .await?; + } Ok(()) } @@ -1872,9 +1925,12 @@ impl IndexerStore for PgIndexerStore { "Failed to persist all event_indices chunks: {:?}", e )) - })?; - let elapsed = guard.stop_and_record(); - info!(elapsed, "Persisted {} event_indices chunks", len); + }) + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} event_indices chunks", len); + }) + .tap_err(|e| tracing::error!("Failed to persist all event_indices chunks: {:?}", e))?; Ok(()) } @@ -1902,9 +1958,12 @@ impl IndexerStore for PgIndexerStore { "Failed to persist all tx_indices chunks: {:?}", e )) - })?; - let elapsed = guard.stop_and_record(); - info!(elapsed, "Persisted {} tx_indices chunks", len); + }) + .tap_ok(|_| { + let elapsed = guard.stop_and_record(); + info!(elapsed, "Persisted {} tx_indices chunks", len); + }) + .tap_err(|e| tracing::error!("Failed to persist all tx_indices chunks: {:?}", e))?; Ok(()) } diff --git a/crates/sui-indexer/src/types.rs b/crates/sui-indexer/src/types.rs index ee2ec93410d4c..81ce3bbb8de5f 100644 --- a/crates/sui-indexer/src/types.rs +++ b/crates/sui-indexer/src/types.rs @@ -1,8 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::errors::IndexerError; use move_core_types::language_storage::StructTag; +use rand::Rng; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use sui_json_rpc_types::{ @@ -25,6 +25,8 @@ use sui_types::sui_serde::SuiStructTag; use sui_types::sui_system_state::sui_system_state_summary::SuiSystemStateSummary; use sui_types::transaction::SenderSignedData; +use crate::errors::IndexerError; + pub type IndexerResult = Result; #[derive(Debug, Default)] @@ -248,6 +250,24 @@ pub struct EventIndex { pub type_instantiation: String, } +// for ingestion test +impl EventIndex { + pub fn random() -> Self { + let mut rng = rand::thread_rng(); + EventIndex { + tx_sequence_number: rng.gen(), + event_sequence_number: rng.gen(), + sender: SuiAddress::random_for_testing_only(), + emit_package: ObjectID::random(), + emit_module: rng.gen::().to_string(), + type_package: ObjectID::random(), + type_module: rng.gen::().to_string(), + type_name: rng.gen::().to_string(), + type_instantiation: rng.gen::().to_string(), + } + } +} + impl EventIndex { pub fn from_event( tx_sequence_number: u64, @@ -408,6 +428,41 @@ pub struct TxIndex { pub move_calls: Vec<(ObjectID, String, String)>, } +impl TxIndex { + pub fn random() -> Self { + let mut rng = rand::thread_rng(); + TxIndex { + tx_sequence_number: rng.gen(), + tx_kind: if rng.gen_bool(0.5) { + TransactionKind::SystemTransaction + } else { + TransactionKind::ProgrammableTransaction + }, + transaction_digest: TransactionDigest::random(), + checkpoint_sequence_number: rng.gen(), + input_objects: (0..1000).map(|_| ObjectID::random()).collect(), + changed_objects: (0..1000).map(|_| ObjectID::random()).collect(), + affected_objects: (0..1000).map(|_| ObjectID::random()).collect(), + payers: (0..rng.gen_range(0..100)) + .map(|_| SuiAddress::random_for_testing_only()) + .collect(), + sender: SuiAddress::random_for_testing_only(), + recipients: (0..rng.gen_range(0..1000)) + .map(|_| SuiAddress::random_for_testing_only()) + .collect(), + move_calls: (0..rng.gen_range(0..1000)) + .map(|_| { + ( + ObjectID::random(), + rng.gen::().to_string(), + rng.gen::().to_string(), + ) + }) + .collect(), + } + } +} + // ObjectChange is not bcs deserializable, IndexedObjectChange is. #[serde_as] #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]