diff --git a/crates/sui-indexer/src/apis/read_api.rs b/crates/sui-indexer/src/apis/read_api.rs index 8fedbf32276b0..ca71dfd5d7644 100644 --- a/crates/sui-indexer/src/apis/read_api.rs +++ b/crates/sui-indexer/src/apis/read_api.rs @@ -34,11 +34,7 @@ impl ReadApi { } async fn get_checkpoint(&self, id: CheckpointId) -> Result { - match self - .inner - .spawn_blocking(move |this| this.get_checkpoint(id)) - .await - { + match self.inner.get_checkpoint(id).await { Ok(Some(epoch_info)) => Ok(epoch_info), Ok(None) => Err(IndexerError::InvalidArgumentError(format!( "Checkpoint {id:?} not found" diff --git a/crates/sui-indexer/src/errors.rs b/crates/sui-indexer/src/errors.rs index 77e26b08637e4..8a1303dbafae3 100644 --- a/crates/sui-indexer/src/errors.rs +++ b/crates/sui-indexer/src/errors.rs @@ -152,3 +152,9 @@ impl From for IndexerError { IndexerError::UncategorizedError(anyhow::Error::from(value)) } } + +impl From for IndexerError { + fn from(value: diesel_async::pooled_connection::bb8::RunError) -> Self { + Self::PgPoolConnectionError(value.to_string()) + } +} diff --git a/crates/sui-indexer/src/indexer_reader.rs b/crates/sui-indexer/src/indexer_reader.rs index e70e0f0832061..82efbd05a0fd4 100644 --- a/crates/sui-indexer/src/indexer_reader.rs +++ b/crates/sui-indexer/src/indexer_reader.rs @@ -9,9 +9,10 @@ use cached::{Cached, SizedCache}; use diesel::PgConnection; use diesel::{ dsl::sql, r2d2::ConnectionManager, sql_types::Bool, ExpressionMethods, OptionalExtension, - QueryDsl, RunQueryDsl, TextExpressionMethods, + QueryDsl, TextExpressionMethods, }; use itertools::{any, Itertools}; +use tap::Pipe; use tap::TapFallible; use fastcrypto::encoding::Encoding; @@ -155,6 +156,7 @@ impl IndexerReader { object_id: &ObjectID, version: Option, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let object_id = object_id.to_vec(); let stored_object = run_query!(&self.blocking_pool, |conn| { @@ -213,6 +215,7 @@ impl IndexerReader { } fn get_object_raw(&self, object_id: ObjectID) -> Result, IndexerError> { + use diesel::RunQueryDsl; let id = object_id.to_vec(); let stored_object = run_query!(&self.blocking_pool, |conn| { objects::dsl::objects @@ -243,6 +246,7 @@ impl IndexerReader { &self, epoch: Option, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let stored_epoch = run_query!(&self.blocking_pool, |conn| { if let Some(epoch) = epoch { epochs::dsl::epochs @@ -261,6 +265,7 @@ impl IndexerReader { } pub fn get_latest_epoch_info_from_db(&self) -> Result { + use diesel::RunQueryDsl; let stored_epoch = run_query!(&self.blocking_pool, |conn| { epochs::dsl::epochs .order_by(epochs::epoch.desc()) @@ -291,6 +296,7 @@ impl IndexerReader { limit: usize, descending_order: bool, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; run_query!(&self.blocking_pool, |conn| { let mut boxed_query = epochs::table.into_boxed(); if let Some(cursor) = cursor { @@ -355,27 +361,32 @@ impl IndexerReader { Ok(system_state) } - pub fn get_checkpoint_from_db( + async fn get_checkpoint_from_db( &self, checkpoint_id: CheckpointId, ) -> Result, IndexerError> { - let stored_checkpoint = run_query!(&self.blocking_pool, |conn| { - match checkpoint_id { - CheckpointId::SequenceNumber(seq) => checkpoints::dsl::checkpoints - .filter(checkpoints::sequence_number.eq(seq as i64)) - .first::(conn) - .optional(), - CheckpointId::Digest(digest) => checkpoints::dsl::checkpoints - .filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec())) - .first::(conn) - .optional(), - } - })?; + use diesel_async::RunQueryDsl; + + let mut connection = self.pool.get().await?; + let stored_checkpoint = checkpoints::table + .into_boxed() + .pipe(|query| match checkpoint_id { + CheckpointId::SequenceNumber(seq) => { + query.filter(checkpoints::sequence_number.eq(seq as i64)) + } + CheckpointId::Digest(digest) => { + query.filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec())) + } + }) + .first::(&mut connection) + .await + .optional()?; Ok(stored_checkpoint) } pub fn get_latest_checkpoint_from_db(&self) -> Result { + use diesel::RunQueryDsl; let stored_checkpoint = run_query!(&self.blocking_pool, |conn| { checkpoints::dsl::checkpoints .order_by(checkpoints::sequence_number.desc()) @@ -385,11 +396,11 @@ impl IndexerReader { Ok(stored_checkpoint) } - pub fn get_checkpoint( + pub async fn get_checkpoint( &self, checkpoint_id: CheckpointId, ) -> Result, IndexerError> { - let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id)? { + let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id).await? { Some(stored_checkpoint) => stored_checkpoint, None => return Ok(None), }; @@ -410,6 +421,7 @@ impl IndexerReader { limit: usize, descending_order: bool, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; run_query!(&self.blocking_pool, |conn| { let mut boxed_query = checkpoints::table.into_boxed(); if let Some(cursor) = cursor { @@ -449,6 +461,7 @@ impl IndexerReader { &self, digest: TransactionDigest, ) -> Result { + use diesel::RunQueryDsl; let stored_txn: StoredTransaction = run_query!(&self.blocking_pool, |conn| { transactions::table .filter(transactions::transaction_digest.eq(digest.into_inner().to_vec())) @@ -462,6 +475,7 @@ impl IndexerReader { &self, sequence_number: i64, ) -> Result { + use diesel::RunQueryDsl; let stored_txn: StoredTransaction = run_query!(&self.blocking_pool, |conn| { transactions::table .filter(transactions::tx_sequence_number.eq(sequence_number)) @@ -475,6 +489,7 @@ impl IndexerReader { &self, digests: &[TransactionDigest], ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let digests = digests .iter() .map(|digest| digest.inner().to_vec()) @@ -526,6 +541,7 @@ impl IndexerReader { // Some(true) for desc, Some(false) for asc, None for undefined order is_descending: Option, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let mut query = transactions::table .filter(transactions::tx_sequence_number.eq_any(tx_sequence_numbers)) .into_boxed(); @@ -560,6 +576,7 @@ impl IndexerReader { cursor: Option, limit: usize, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; run_query!(&self.blocking_pool, |conn| { let mut query = objects::dsl::objects .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16)) @@ -637,6 +654,7 @@ impl IndexerReader { object_ids: Vec, object_type: String, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec(); let filtered_ids = run_query!(&self.blocking_pool, |conn| { objects::dsl::objects @@ -671,6 +689,7 @@ impl IndexerReader { &self, object_ids: Vec, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec(); run_query!(&self.blocking_pool, |conn| { objects::dsl::objects @@ -687,6 +706,7 @@ impl IndexerReader { limit: usize, is_descending: bool, ) -> IndexerResult> { + use diesel::RunQueryDsl; let mut query = transactions::dsl::transactions .filter(transactions::dsl::checkpoint_sequence_number.eq(checkpoint_seq as i64)) .into_boxed(); @@ -732,6 +752,7 @@ impl IndexerReader { limit: usize, is_descending: bool, ) -> IndexerResult> { + use diesel::RunQueryDsl; let cursor_tx_seq = if let Some(cursor) = cursor { let pool = self.get_pool(); let tx_seq = run_query_async!(&pool, move |conn| { @@ -977,6 +998,7 @@ impl IndexerReader { &self, digest: TransactionDigest, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let pool = self.get_pool(); let (timestamp_ms, serialized_events) = run_query_async!(&pool, move |conn| { transactions::table @@ -1044,6 +1066,7 @@ impl IndexerReader { limit: usize, descending_order: bool, ) -> IndexerResult> { + use diesel::RunQueryDsl; let pool = self.get_pool(); let (tx_seq, event_seq) = if let Some(cursor) = cursor { let EventID { @@ -1265,6 +1288,7 @@ impl IndexerReader { cursor: Option, limit: usize, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let objects: Vec = run_query!(&self.blocking_pool, |conn| { let mut query = objects::dsl::objects .filter(objects::dsl::owner_type.eq(OwnerType::Object as i16)) @@ -1304,6 +1328,7 @@ impl IndexerReader { &self, object_ids: Vec>, ) -> IndexerResult> { + use diesel::RunQueryDsl; run_query!(&self.blocking_pool, |conn| { let query = objects::dsl::objects .select(( @@ -1349,6 +1374,7 @@ impl IndexerReader { &self, object_type: String, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let stored_display = run_query!(&self.blocking_pool, |conn| { display::table .filter(display::object_type.eq(object_type)) @@ -1385,6 +1411,7 @@ impl IndexerReader { cursor: ObjectID, limit: usize, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let mut query = objects::dsl::objects .filter(objects::dsl::owner_type.eq(OwnerType::Address as i16)) .filter(objects::dsl::owner_id.eq(owner.to_vec())) @@ -1424,6 +1451,7 @@ impl IndexerReader { // If coin_type is None, look for all coins. coin_type: Option, ) -> Result, IndexerError> { + use diesel::RunQueryDsl; let coin_type_filter = if let Some(coin_type) = coin_type { format!("= '{}'", coin_type) } else { @@ -1548,6 +1576,7 @@ impl IndexerReader { } pub fn get_consistent_read_range(&self) -> Result<(i64, i64), IndexerError> { + use diesel::RunQueryDsl; let latest_checkpoint_sequence = run_query!(&self.blocking_pool, |conn| { checkpoints::table .select(checkpoints::sequence_number)