Skip to content

Commit

Permalink
indexer: use async connection for get_checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bmwill committed Sep 4, 2024
1 parent be5bdad commit 9b6fcb2
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 21 deletions.
6 changes: 1 addition & 5 deletions crates/sui-indexer/src/apis/read_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,7 @@ impl ReadApi {
}

async fn get_checkpoint(&self, id: CheckpointId) -> Result<Checkpoint, IndexerError> {
match self
.inner
.spawn_blocking(move |this| this.get_checkpoint(id))
.await
{
match self.inner.get_checkpoint(id).await {
Ok(Some(epoch_info)) => Ok(epoch_info),
Ok(None) => Err(IndexerError::InvalidArgumentError(format!(
"Checkpoint {id:?} not found"
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-indexer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,9 @@ impl From<tokio::task::JoinError> for IndexerError {
IndexerError::UncategorizedError(anyhow::Error::from(value))
}
}

impl From<diesel_async::pooled_connection::bb8::RunError> for IndexerError {
fn from(value: diesel_async::pooled_connection::bb8::RunError) -> Self {
Self::PgPoolConnectionError(value.to_string())
}
}
61 changes: 45 additions & 16 deletions crates/sui-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ use cached::{Cached, SizedCache};
use diesel::PgConnection;
use diesel::{
dsl::sql, r2d2::ConnectionManager, sql_types::Bool, ExpressionMethods, OptionalExtension,
QueryDsl, RunQueryDsl, TextExpressionMethods,
QueryDsl, TextExpressionMethods,
};
use itertools::{any, Itertools};
use tap::Pipe;
use tap::TapFallible;

use fastcrypto::encoding::Encoding;
Expand Down Expand Up @@ -155,6 +156,7 @@ impl IndexerReader {
object_id: &ObjectID,
version: Option<VersionNumber>,
) -> Result<Option<StoredObject>, IndexerError> {
use diesel::RunQueryDsl;
let object_id = object_id.to_vec();

let stored_object = run_query!(&self.blocking_pool, |conn| {
Expand Down Expand Up @@ -213,6 +215,7 @@ impl IndexerReader {
}

fn get_object_raw(&self, object_id: ObjectID) -> Result<Option<StoredObject>, IndexerError> {
use diesel::RunQueryDsl;
let id = object_id.to_vec();
let stored_object = run_query!(&self.blocking_pool, |conn| {
objects::dsl::objects
Expand Down Expand Up @@ -243,6 +246,7 @@ impl IndexerReader {
&self,
epoch: Option<EpochId>,
) -> Result<Option<StoredEpochInfo>, IndexerError> {
use diesel::RunQueryDsl;
let stored_epoch = run_query!(&self.blocking_pool, |conn| {
if let Some(epoch) = epoch {
epochs::dsl::epochs
Expand All @@ -261,6 +265,7 @@ impl IndexerReader {
}

pub fn get_latest_epoch_info_from_db(&self) -> Result<StoredEpochInfo, IndexerError> {
use diesel::RunQueryDsl;
let stored_epoch = run_query!(&self.blocking_pool, |conn| {
epochs::dsl::epochs
.order_by(epochs::epoch.desc())
Expand Down Expand Up @@ -291,6 +296,7 @@ impl IndexerReader {
limit: usize,
descending_order: bool,
) -> Result<Vec<StoredEpochInfo>, IndexerError> {
use diesel::RunQueryDsl;
run_query!(&self.blocking_pool, |conn| {
let mut boxed_query = epochs::table.into_boxed();
if let Some(cursor) = cursor {
Expand Down Expand Up @@ -355,27 +361,32 @@ impl IndexerReader {
Ok(system_state)
}

pub fn get_checkpoint_from_db(
async fn get_checkpoint_from_db(
&self,
checkpoint_id: CheckpointId,
) -> Result<Option<StoredCheckpoint>, IndexerError> {
let stored_checkpoint = run_query!(&self.blocking_pool, |conn| {
match checkpoint_id {
CheckpointId::SequenceNumber(seq) => checkpoints::dsl::checkpoints
.filter(checkpoints::sequence_number.eq(seq as i64))
.first::<StoredCheckpoint>(conn)
.optional(),
CheckpointId::Digest(digest) => checkpoints::dsl::checkpoints
.filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec()))
.first::<StoredCheckpoint>(conn)
.optional(),
}
})?;
use diesel_async::RunQueryDsl;

let mut connection = self.pool.get().await?;
let stored_checkpoint = checkpoints::table
.into_boxed()
.pipe(|query| match checkpoint_id {
CheckpointId::SequenceNumber(seq) => {
query.filter(checkpoints::sequence_number.eq(seq as i64))
}
CheckpointId::Digest(digest) => {
query.filter(checkpoints::checkpoint_digest.eq(digest.into_inner().to_vec()))
}
})
.first::<StoredCheckpoint>(&mut connection)
.await
.optional()?;

Ok(stored_checkpoint)
}

pub fn get_latest_checkpoint_from_db(&self) -> Result<StoredCheckpoint, IndexerError> {
use diesel::RunQueryDsl;
let stored_checkpoint = run_query!(&self.blocking_pool, |conn| {
checkpoints::dsl::checkpoints
.order_by(checkpoints::sequence_number.desc())
Expand All @@ -385,11 +396,11 @@ impl IndexerReader {
Ok(stored_checkpoint)
}

pub fn get_checkpoint(
pub async fn get_checkpoint(
&self,
checkpoint_id: CheckpointId,
) -> Result<Option<sui_json_rpc_types::Checkpoint>, IndexerError> {
let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id)? {
let stored_checkpoint = match self.get_checkpoint_from_db(checkpoint_id).await? {
Some(stored_checkpoint) => stored_checkpoint,
None => return Ok(None),
};
Expand All @@ -410,6 +421,7 @@ impl IndexerReader {
limit: usize,
descending_order: bool,
) -> Result<Vec<StoredCheckpoint>, IndexerError> {
use diesel::RunQueryDsl;
run_query!(&self.blocking_pool, |conn| {
let mut boxed_query = checkpoints::table.into_boxed();
if let Some(cursor) = cursor {
Expand Down Expand Up @@ -449,6 +461,7 @@ impl IndexerReader {
&self,
digest: TransactionDigest,
) -> Result<SuiTransactionBlockEffects, IndexerError> {
use diesel::RunQueryDsl;
let stored_txn: StoredTransaction = run_query!(&self.blocking_pool, |conn| {
transactions::table
.filter(transactions::transaction_digest.eq(digest.into_inner().to_vec()))
Expand All @@ -462,6 +475,7 @@ impl IndexerReader {
&self,
sequence_number: i64,
) -> Result<SuiTransactionBlockEffects, IndexerError> {
use diesel::RunQueryDsl;
let stored_txn: StoredTransaction = run_query!(&self.blocking_pool, |conn| {
transactions::table
.filter(transactions::tx_sequence_number.eq(sequence_number))
Expand All @@ -475,6 +489,7 @@ impl IndexerReader {
&self,
digests: &[TransactionDigest],
) -> Result<Vec<StoredTransaction>, IndexerError> {
use diesel::RunQueryDsl;
let digests = digests
.iter()
.map(|digest| digest.inner().to_vec())
Expand Down Expand Up @@ -526,6 +541,7 @@ impl IndexerReader {
// Some(true) for desc, Some(false) for asc, None for undefined order
is_descending: Option<bool>,
) -> Result<Vec<StoredTransaction>, IndexerError> {
use diesel::RunQueryDsl;
let mut query = transactions::table
.filter(transactions::tx_sequence_number.eq_any(tx_sequence_numbers))
.into_boxed();
Expand Down Expand Up @@ -560,6 +576,7 @@ impl IndexerReader {
cursor: Option<ObjectID>,
limit: usize,
) -> Result<Vec<StoredObject>, IndexerError> {
use diesel::RunQueryDsl;
run_query!(&self.blocking_pool, |conn| {
let mut query = objects::dsl::objects
.filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
Expand Down Expand Up @@ -637,6 +654,7 @@ impl IndexerReader {
object_ids: Vec<ObjectID>,
object_type: String,
) -> Result<Vec<ObjectID>, IndexerError> {
use diesel::RunQueryDsl;
let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
let filtered_ids = run_query!(&self.blocking_pool, |conn| {
objects::dsl::objects
Expand Down Expand Up @@ -671,6 +689,7 @@ impl IndexerReader {
&self,
object_ids: Vec<ObjectID>,
) -> Result<Vec<StoredObject>, IndexerError> {
use diesel::RunQueryDsl;
let object_ids = object_ids.into_iter().map(|id| id.to_vec()).collect_vec();
run_query!(&self.blocking_pool, |conn| {
objects::dsl::objects
Expand All @@ -687,6 +706,7 @@ impl IndexerReader {
limit: usize,
is_descending: bool,
) -> IndexerResult<Vec<SuiTransactionBlockResponse>> {
use diesel::RunQueryDsl;
let mut query = transactions::dsl::transactions
.filter(transactions::dsl::checkpoint_sequence_number.eq(checkpoint_seq as i64))
.into_boxed();
Expand Down Expand Up @@ -732,6 +752,7 @@ impl IndexerReader {
limit: usize,
is_descending: bool,
) -> IndexerResult<Vec<SuiTransactionBlockResponse>> {
use diesel::RunQueryDsl;
let cursor_tx_seq = if let Some(cursor) = cursor {
let pool = self.get_pool();
let tx_seq = run_query_async!(&pool, move |conn| {
Expand Down Expand Up @@ -977,6 +998,7 @@ impl IndexerReader {
&self,
digest: TransactionDigest,
) -> Result<Vec<sui_json_rpc_types::SuiEvent>, IndexerError> {
use diesel::RunQueryDsl;
let pool = self.get_pool();
let (timestamp_ms, serialized_events) = run_query_async!(&pool, move |conn| {
transactions::table
Expand Down Expand Up @@ -1044,6 +1066,7 @@ impl IndexerReader {
limit: usize,
descending_order: bool,
) -> IndexerResult<Vec<SuiEvent>> {
use diesel::RunQueryDsl;
let pool = self.get_pool();
let (tx_seq, event_seq) = if let Some(cursor) = cursor {
let EventID {
Expand Down Expand Up @@ -1265,6 +1288,7 @@ impl IndexerReader {
cursor: Option<ObjectID>,
limit: usize,
) -> Result<Vec<StoredObject>, IndexerError> {
use diesel::RunQueryDsl;
let objects: Vec<StoredObject> = run_query!(&self.blocking_pool, |conn| {
let mut query = objects::dsl::objects
.filter(objects::dsl::owner_type.eq(OwnerType::Object as i16))
Expand Down Expand Up @@ -1304,6 +1328,7 @@ impl IndexerReader {
&self,
object_ids: Vec<Vec<u8>>,
) -> IndexerResult<HashMap<ObjectID, ObjectRef>> {
use diesel::RunQueryDsl;
run_query!(&self.blocking_pool, |conn| {
let query = objects::dsl::objects
.select((
Expand Down Expand Up @@ -1349,6 +1374,7 @@ impl IndexerReader {
&self,
object_type: String,
) -> Result<Option<sui_types::display::DisplayVersionUpdatedEvent>, IndexerError> {
use diesel::RunQueryDsl;
let stored_display = run_query!(&self.blocking_pool, |conn| {
display::table
.filter(display::object_type.eq(object_type))
Expand Down Expand Up @@ -1385,6 +1411,7 @@ impl IndexerReader {
cursor: ObjectID,
limit: usize,
) -> Result<Vec<SuiCoin>, IndexerError> {
use diesel::RunQueryDsl;
let mut query = objects::dsl::objects
.filter(objects::dsl::owner_type.eq(OwnerType::Address as i16))
.filter(objects::dsl::owner_id.eq(owner.to_vec()))
Expand Down Expand Up @@ -1424,6 +1451,7 @@ impl IndexerReader {
// If coin_type is None, look for all coins.
coin_type: Option<String>,
) -> Result<Vec<Balance>, IndexerError> {
use diesel::RunQueryDsl;
let coin_type_filter = if let Some(coin_type) = coin_type {
format!("= '{}'", coin_type)
} else {
Expand Down Expand Up @@ -1548,6 +1576,7 @@ impl IndexerReader {
}

pub fn get_consistent_read_range(&self) -> Result<(i64, i64), IndexerError> {
use diesel::RunQueryDsl;
let latest_checkpoint_sequence = run_query!(&self.blocking_pool, |conn| {
checkpoints::table
.select(checkpoints::sequence_number)
Expand Down

0 comments on commit 9b6fcb2

Please sign in to comment.