Skip to content

Commit

Permalink
[Statedb] Refactor Child object export (rooch-network#1597)
Browse files Browse the repository at this point in the history
* fixup inscription id test cases

* finish statedb dump and apply
  • Loading branch information
baichuan3 authored Apr 21, 2024
1 parent 4bf8565 commit 4eb3547
Show file tree
Hide file tree
Showing 28 changed files with 291 additions and 709 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ use bitcoin::{Block, OutPoint, Transaction, TxOut};
use hex::FromHex;
use moveos_types::access_path::AccessPath;
use moveos_types::module_binding::MoveFunctionCaller;
use moveos_types::moveos_std::object;
use moveos_types::state::MoveStructType;
use moveos_types::state_resolver::StateReader;
use rooch_types::bitcoin::ord;
use rooch_types::bitcoin::ord::{Inscription, InscriptionID};
use rooch_types::bitcoin::types::{self, Header};
use rooch_types::bitcoin::utxo::{self, UTXO};
Expand Down Expand Up @@ -179,7 +178,7 @@ fn check_utxo(txs: Vec<Transaction>, binding_test: &binding_test::RustBindingTes
txid_address, index
);
let inscription_id = InscriptionID::new(txid_address, index);
let object_id = object::custom_object_id(&inscription_id, &Inscription::struct_tag());
let object_id = ord::derive_inscription_id(&inscription_id);
let inscription_state = moveos_resolver
.get_states(AccessPath::object(object_id))
.unwrap()
Expand Down

This file was deleted.

This file was deleted.

22 changes: 0 additions & 22 deletions crates/rooch-indexer/src/actor/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,28 +275,6 @@ impl Handler<IndexerStatesMessage> for IndexerActor {
self.indexer_store
.delete_field_states_by_object_id(remove_field_states_by_object_id)?;

// TODO Temporarily close StateChangeSet Indexer writing and wait for the function to be turned on.
// Store table change set for state sync
// let mut split_state_change_set = SplitStateChangeSet::default();

// for (object_id, table_change) in state_change_set.changes.clone() {
// split_state_change_set.add_table_change(object_id, table_change);
// }

// let mut indexed_table_change_sets = vec![];
// for (index, item) in split_state_change_set
// .table_change_sets
// .into_iter()
// .enumerate()
// {
// let table_change_set =
// IndexedTableChangeSet::new(tx_order, index as u64, item.0, item.1)?;

// indexed_table_change_sets.push(table_change_set);
// }

// self.indexer_store
// .persist_table_change_sets(indexed_table_change_sets)?;
Ok(())
}
}
Expand Down
17 changes: 1 addition & 16 deletions crates/rooch-indexer/src/actor/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use moveos_types::state::StateChangeSet;
use moveos_types::transaction::{TransactionExecutionInfo, VerifiedMoveOSTransaction};
use rooch_types::indexer::event_filter::{EventFilter, IndexerEvent, IndexerEventID};
use rooch_types::indexer::state::{
FieldStateFilter, IndexerFieldState, IndexerObjectState, IndexerStateID, IndexerTableChangeSet,
ObjectStateFilter, StateSyncFilter,
FieldStateFilter, IndexerFieldState, IndexerObjectState, IndexerStateID, ObjectStateFilter,
};
use rooch_types::indexer::transaction_filter::TransactionFilter;
use rooch_types::transaction::LedgerTransaction;
Expand Down Expand Up @@ -109,17 +108,3 @@ pub struct QueryIndexerFieldStatesMessage {
impl Message for QueryIndexerFieldStatesMessage {
type Result = Result<Vec<IndexerFieldState>>;
}

/// Sync Indexer State change sets Message
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncIndexerStatesMessage {
pub filter: Option<StateSyncFilter>,
// exclusive cursor if `Some`, otherwise start from the beginning
pub cursor: Option<IndexerStateID>,
pub limit: usize,
pub descending_order: bool,
}

impl Message for SyncIndexerStatesMessage {
type Result = Result<Vec<IndexerTableChangeSet>>;
}
28 changes: 2 additions & 26 deletions crates/rooch-indexer/src/actor/reader_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@

use crate::actor::messages::{
QueryIndexerEventsMessage, QueryIndexerFieldStatesMessage, QueryIndexerObjectStatesMessage,
QueryIndexerTransactionsMessage, SyncIndexerStatesMessage,
QueryIndexerTransactionsMessage,
};
use crate::indexer_reader::IndexerReader;
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use coerce::actor::{context::ActorContext, message::Handler, Actor};
use rooch_types::indexer::event_filter::IndexerEvent;
use rooch_types::indexer::state::{IndexerFieldState, IndexerObjectState, IndexerTableChangeSet};
use rooch_types::indexer::state::{IndexerFieldState, IndexerObjectState};
use rooch_types::transaction::TransactionWithInfo;

pub struct IndexerReaderActor {
Expand Down Expand Up @@ -100,27 +100,3 @@ impl Handler<QueryIndexerFieldStatesMessage> for IndexerReaderActor {
.map_err(|e| anyhow!(format!("Failed to query indexer table states: {:?}", e)))
}
}

#[async_trait]
impl Handler<SyncIndexerStatesMessage> for IndexerReaderActor {
async fn handle(
&mut self,
msg: SyncIndexerStatesMessage,
_ctx: &mut ActorContext,
) -> Result<Vec<IndexerTableChangeSet>> {
let SyncIndexerStatesMessage {
filter,
cursor,
limit,
descending_order,
} = msg;
self.indexer_reader
.sync_states(filter, cursor, limit, descending_order)
.map_err(|e| {
anyhow!(format!(
"Failed to query indexer state change sets: {:?}",
e
))
})
}
}
97 changes: 4 additions & 93 deletions crates/rooch-indexer/src/indexer_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,24 @@
// SPDX-License-Identifier: Apache-2.0

use crate::models::events::StoredEvent;
use crate::models::states::{StoredFieldState, StoredObjectState, StoredTableChangeSet};
use crate::models::states::{StoredFieldState, StoredObjectState};
use crate::models::transactions::StoredTransaction;
use crate::schema::object_states;
use crate::schema::{events, field_states, table_change_sets, transactions};
use crate::schema::{events, field_states, transactions};
use crate::types::IndexerResult;
use crate::utils::format_struct_tag;
use crate::{
errors::IndexerError, IndexerStoreMeta, SqliteConnectionConfig, SqliteConnectionPoolConfig,
SqlitePoolConnection, INDEXER_EVENTS_TABLE_NAME, INDEXER_FIELD_STATES_TABLE_NAME,
INDEXER_OBJECT_STATES_TABLE_NAME, INDEXER_TABLE_CHANGE_SETS_TABLE_NAME,
INDEXER_TRANSACTIONS_TABLE_NAME,
INDEXER_OBJECT_STATES_TABLE_NAME, INDEXER_TRANSACTIONS_TABLE_NAME,
};
use anyhow::{anyhow, Result};
use diesel::{
r2d2::ConnectionManager, Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SqliteConnection,
};
use rooch_types::indexer::event_filter::{EventFilter, IndexerEvent, IndexerEventID};
use rooch_types::indexer::state::{
FieldStateFilter, IndexerFieldState, IndexerObjectState, IndexerStateID, IndexerTableChangeSet,
ObjectStateFilter, StateSyncFilter,
FieldStateFilter, IndexerFieldState, IndexerObjectState, IndexerStateID, ObjectStateFilter,
};
use rooch_types::indexer::transaction_filter::TransactionFilter;
use rooch_types::transaction::TransactionWithInfo;
Expand Down Expand Up @@ -502,91 +500,4 @@ impl IndexerReader {

Ok(result)
}

pub fn sync_states(
&self,
filter: Option<StateSyncFilter>,
// exclusive cursor if `Some`, otherwise start from the beginning
cursor: Option<IndexerStateID>,
limit: usize,
descending_order: bool,
) -> IndexerResult<Vec<IndexerTableChangeSet>> {
let (tx_order, state_index) = if let Some(cursor) = cursor {
let IndexerStateID {
tx_order,
state_index,
} = cursor;
(tx_order as i64, state_index as i64)
} else if descending_order {
let (max_tx_order, state_index): (i64, i64) = self
.get_inner_indexer_reader(INDEXER_TABLE_CHANGE_SETS_TABLE_NAME)?
.run_query(|conn| {
table_change_sets::dsl::table_change_sets
.select((table_change_sets::tx_order, table_change_sets::state_index))
.order_by((
table_change_sets::tx_order.desc(),
table_change_sets::state_index.desc(),
))
.first::<(i64, i64)>(conn)
})?;
(max_tx_order + 1, state_index)
} else {
(-1, 0)
};

let main_where_clause_opt = filter.map(|f| match f {
StateSyncFilter::ObjectId(object_id) => {
format!("{STATE_OBJECT_ID_STR} = \"{}\"", object_id)
}
});
let cursor_clause = if descending_order {
format!(
" ({TX_ORDER_STR} < {} OR ({TX_ORDER_STR} = {} AND {STATE_INDEX_STR} < {}))",
tx_order, tx_order, state_index
)
} else {
format!(
" ({TX_ORDER_STR} > {} OR ({TX_ORDER_STR} = {} AND {STATE_INDEX_STR} > {}))",
tx_order, tx_order, state_index
)
};
let where_clause = match main_where_clause_opt {
Some(main_where_clause) => format!(" {} AND {} ", main_where_clause, cursor_clause),
None => format!(" {} ", cursor_clause),
};

let order_clause = if descending_order {
format!("{TX_ORDER_STR} DESC, {STATE_INDEX_STR} DESC")
} else {
format!("{TX_ORDER_STR} ASC, {STATE_INDEX_STR} ASC")
};

let query = format!(
"
SELECT * FROM table_change_sets \
WHERE {} \
ORDER BY {} \
LIMIT {}
",
where_clause, order_clause, limit,
);

tracing::debug!("sync states: {}", query);
let stored_table_change_sets = self
.get_inner_indexer_reader(INDEXER_TABLE_CHANGE_SETS_TABLE_NAME)?
.run_query(|conn| diesel::sql_query(query).load::<StoredTableChangeSet>(conn))?;

let result = stored_table_change_sets
.into_iter()
.map(|t| t.try_into_indexer_state_change_set())
.collect::<Result<Vec<_>>>()
.map_err(|e| {
IndexerError::SQLiteReadError(format!(
"Cast indexer table change sets failed: {:?}",
e
))
})?;

Ok(result)
}
}
16 changes: 3 additions & 13 deletions crates/rooch-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use diesel::sqlite::SqliteConnection;

use crate::store::sqlite_store::SqliteIndexerStore;
use crate::store::traits::IndexerStoreTrait;
use crate::types::{
IndexedEvent, IndexedFieldState, IndexedObjectState, IndexedTableChangeSet, IndexedTransaction,
};
use crate::types::{IndexedEvent, IndexedFieldState, IndexedObjectState, IndexedTransaction};
use crate::utils::create_all_tables_if_not_exists;
use errors::IndexerError;
use rooch_config::indexer_config::ROOCH_INDEXER_DB_DIR;
Expand All @@ -38,7 +36,7 @@ pub mod utils;
pub type IndexerTableName = &'static str;
pub const INDEXER_EVENTS_TABLE_NAME: IndexerTableName = "events";
pub const INDEXER_OBJECT_STATES_TABLE_NAME: IndexerTableName = "object_states";
pub const INDEXER_TABLE_CHANGE_SETS_TABLE_NAME: IndexerTableName = "table_change_sets";
// pub const INDEXER_TABLE_CHANGE_SETS_TABLE_NAME: IndexerTableName = "object_changes";
pub const INDEXER_FIELD_STATES_TABLE_NAME: IndexerTableName = "field_states";
pub const INDEXER_TRANSACTIONS_TABLE_NAME: IndexerTableName = "transactions";

Expand All @@ -47,7 +45,7 @@ static INDEXER_VEC_TABLE_NAME: Lazy<Vec<IndexerTableName>> = Lazy::new(|| {
vec![
INDEXER_EVENTS_TABLE_NAME,
INDEXER_OBJECT_STATES_TABLE_NAME,
INDEXER_TABLE_CHANGE_SETS_TABLE_NAME,
// INDEXER_TABLE_CHANGE_SETS_TABLE_NAME,
INDEXER_FIELD_STATES_TABLE_NAME,
INDEXER_TRANSACTIONS_TABLE_NAME,
]
Expand Down Expand Up @@ -177,14 +175,6 @@ impl IndexerStoreTrait for IndexerStore {
.delete_field_states_by_object_id(object_ids)
}

fn persist_table_change_sets(
&self,
table_change_sets: Vec<IndexedTableChangeSet>,
) -> Result<(), IndexerError> {
self.get_sqlite_store(INDEXER_TABLE_CHANGE_SETS_TABLE_NAME)?
.persist_table_change_sets(table_change_sets)
}

fn persist_transactions(
&self,
transactions: Vec<IndexedTransaction>,
Expand Down
57 changes: 2 additions & 55 deletions crates/rooch-indexer/src/models/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@

use crate::schema::field_states;
use crate::schema::object_states;
use crate::schema::table_change_sets;
use crate::types::{IndexedFieldState, IndexedObjectState, IndexedTableChangeSet};
use crate::types::{IndexedFieldState, IndexedObjectState};
use diesel::prelude::*;
use move_core_types::account_address::AccountAddress;
use move_core_types::language_storage::{StructTag, TypeTag};
use moveos_types::moveos_std::object::ObjectID;
use rooch_rpc_api::jsonrpc_types::TableChangeSetView;
use rooch_types::indexer::state::{IndexerFieldState, IndexerObjectState, IndexerTableChangeSet};
use rooch_types::indexer::state::{IndexerFieldState, IndexerObjectState};
use std::str::FromStr;

#[derive(Queryable, QueryableByName, Insertable, Debug, Clone)]
Expand Down Expand Up @@ -166,54 +164,3 @@ impl StoredFieldState {
Ok(state)
}
}

#[derive(Clone, Debug, Queryable, Insertable, QueryableByName)]
#[diesel(table_name = table_change_sets)]
pub struct StoredTableChangeSet {
/// The tx order of this transaction which produce the table change set
#[diesel(sql_type = diesel::sql_types::BigInt)]
pub tx_order: i64,
/// The table handle index in the tx
#[diesel(sql_type = diesel::sql_types::BigInt)]
pub state_index: i64,
/// The table handle
#[diesel(sql_type = diesel::sql_types::Text)]
pub table_handle: String,
/// The table change set, json format
#[diesel(sql_type = diesel::sql_types::Text)]
pub table_change_set: String,
/// The tx executed timestamp on chain
#[diesel(sql_type = diesel::sql_types::BigInt)]
pub created_at: i64,
}

impl From<IndexedTableChangeSet> for StoredTableChangeSet {
fn from(state_change_set: IndexedTableChangeSet) -> Self {
Self {
tx_order: state_change_set.tx_order as i64,
state_index: state_change_set.state_index as i64,
table_handle: state_change_set.table_handle.to_string(),
table_change_set: state_change_set.table_change_set,
created_at: state_change_set.created_at as i64,
}
}
}

impl StoredTableChangeSet {
pub fn try_into_indexer_state_change_set(
&self,
) -> Result<IndexerTableChangeSet, anyhow::Error> {
let table_handle = ObjectID::from_str(self.table_handle.as_str())?;
let table_change_set: TableChangeSetView =
serde_json::from_str(self.table_change_set.as_str())?;

let indexer_state_change_set = IndexerTableChangeSet {
tx_order: self.tx_order as u64,
state_index: self.state_index as u64,
table_handle,
table_change_set: table_change_set.into(),
created_at: self.created_at as u64,
};
Ok(indexer_state_change_set)
}
}
Loading

0 comments on commit 4eb3547

Please sign in to comment.