diff --git a/chaindexing-tests/src/tests/events_ingester.rs b/chaindexing-tests/src/tests/events_ingester.rs index e2bd9bc..03d64ed 100644 --- a/chaindexing-tests/src/tests/events_ingester.rs +++ b/chaindexing-tests/src/tests/events_ingester.rs @@ -10,7 +10,9 @@ mod tests { use crate::{ json_rpc_with_empty_logs, json_rpc_with_filter_stubber, json_rpc_with_logs, test_runner, }; - use chaindexing::{Chaindexing, EventsIngester, PostgresRepo, Repo}; + use chaindexing::{ + Chain, Chaindexing, EventsIngester, MinConfirmationCount, PostgresRepo, Repo, + }; #[tokio::test] pub async fn creates_contract_events() { @@ -28,7 +30,17 @@ mod tests { Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await; let conn = Arc::new(Mutex::new(conn)); - EventsIngester::ingest(conn.clone(), &contracts, 10, json_rpc).await.unwrap(); + EventsIngester::ingest( + conn.clone(), + &contracts, + 10, + json_rpc, + &Chain::Mainnet, + &MinConfirmationCount::new(1), + false, + ) + .await + .unwrap(); let mut conn = conn.lock().await; let ingested_events = PostgresRepo::get_all_events(&mut conn).await; @@ -66,7 +78,17 @@ mod tests { )); let conn = Arc::new(Mutex::new(conn)); - EventsIngester::ingest(conn, &contracts, 10, json_rpc).await.unwrap(); + EventsIngester::ingest( + conn, + &contracts, + 10, + json_rpc, + &Chain::Mainnet, + &MinConfirmationCount::new(1), + false, + ) + .await + .unwrap(); }) .await; } @@ -87,9 +109,18 @@ mod tests { let conn = Arc::new(Mutex::new(conn)); let blocks_per_batch = 10; - EventsIngester::ingest(conn.clone(), &contracts, blocks_per_batch, json_rpc) - .await - .unwrap(); + + EventsIngester::ingest( + conn.clone(), + &contracts, + blocks_per_batch, + json_rpc, + &Chain::Mainnet, + &MinConfirmationCount::new(1), + false, + ) + .await + .unwrap(); let mut conn = conn.lock().await; let contract_addresses = PostgresRepo::get_all_contract_addresses(&mut conn).await; @@ -117,9 +148,17 @@ mod tests { let json_rpc = Arc::new(empty_json_rpc()); let blocks_per_batch = 10; let conn = Arc::new(Mutex::new(conn)); - EventsIngester::ingest(conn.clone(), &contracts, blocks_per_batch, json_rpc) - .await - .unwrap(); + EventsIngester::ingest( + conn.clone(), + &contracts, + blocks_per_batch, + json_rpc, + &Chain::Mainnet, + &MinConfirmationCount::new(1), + false, + ) + .await + .unwrap(); let mut conn = conn.lock().await; assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty()); }) @@ -138,7 +177,17 @@ mod tests { Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await; let conn = Arc::new(Mutex::new(conn)); - EventsIngester::ingest(conn.clone(), &contracts, 10, json_rpc).await.unwrap(); + EventsIngester::ingest( + conn.clone(), + &contracts, + 10, + json_rpc, + &Chain::Mainnet, + &MinConfirmationCount::new(1), + false, + ) + .await + .unwrap(); let mut conn = conn.lock().await; assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty()); diff --git a/chaindexing/src/chain_reorg.rs b/chaindexing/src/chain_reorg.rs new file mode 100644 index 0000000..d5b5917 --- /dev/null +++ b/chaindexing/src/chain_reorg.rs @@ -0,0 +1,26 @@ +use std::cmp::max; + +/// Tolerance for chain re-organization +#[derive(Clone)] +pub struct MinConfirmationCount { + value: u8, +} + +impl MinConfirmationCount { + pub fn new(value: u8) -> Self { + Self { value } + } + + pub fn ideduct_from(&self, block_number: i64, start_block_number: i64) -> u64 { + self.deduct_from(block_number as u64, start_block_number as u64) + } + + pub fn deduct_from(&self, block_number: u64, start_block_number: u64) -> u64 { + max(start_block_number, block_number - (self.value as u64)) + } +} + +pub enum Execution<'a> { + Main, + Confirmation(&'a MinConfirmationCount), +} diff --git a/chaindexing/src/config.rs b/chaindexing/src/config.rs index a207ab0..9c79c61 100644 --- a/chaindexing/src/config.rs +++ b/chaindexing/src/config.rs @@ -1,12 +1,11 @@ -use crate::{ChaindexingRepo, Chains, Contract}; +use crate::{ChaindexingRepo, Chains, Contract, MinConfirmationCount}; #[derive(Clone)] pub struct Config { pub chains: Chains, pub repo: ChaindexingRepo, pub contracts: Vec, - /// Tolerance for chain re-organization - pub min_confirmation_count: u8, + pub min_confirmation_count: MinConfirmationCount, pub blocks_per_batch: u64, pub handler_interval_ms: u64, pub ingestion_interval_ms: u64, @@ -19,7 +18,7 @@ impl Config { repo, chains, contracts: vec![], - min_confirmation_count: 40, + min_confirmation_count: MinConfirmationCount::new(40), blocks_per_batch: 20, handler_interval_ms: 10000, ingestion_interval_ms: 10000, @@ -40,7 +39,7 @@ impl Config { } pub fn with_min_confirmation_count(mut self, min_confirmation_count: u8) -> Self { - self.min_confirmation_count = min_confirmation_count; + self.min_confirmation_count = MinConfirmationCount::new(min_confirmation_count); self } diff --git a/chaindexing/src/contract_states/migrations.rs b/chaindexing/src/contract_states/migrations.rs index 2386fc5..dbf9019 100644 --- a/chaindexing/src/contract_states/migrations.rs +++ b/chaindexing/src/contract_states/migrations.rs @@ -12,8 +12,6 @@ pub trait ContractStateMigrations: Send + Sync { validate_migration(user_migration); if user_migration.starts_with("CREATE TABLE IF NOT EXISTS") { - // FUTURE TODO: Index user migration fields - // Index State Version fields let create_state_table_migration = user_migration.to_owned().to_owned(); let create_state_versions_table_migration = append_migration( diff --git a/chaindexing/src/contracts.rs b/chaindexing/src/contracts.rs index d1158a1..480b7a7 100644 --- a/chaindexing/src/contracts.rs +++ b/chaindexing/src/contracts.rs @@ -1,9 +1,7 @@ use std::{collections::HashMap, str::FromStr, sync::Arc}; -use crate::{ - diesels::schema::chaindexing_contract_addresses, event_handlers::EventHandler, - ContractStateMigrations, -}; +use crate::diesels::schema::chaindexing_contract_addresses; +use crate::{ContractStateMigrations, EventHandler}; use diesel::{Identifiable, Insertable, Queryable}; use ethers::{ @@ -100,10 +98,6 @@ impl Contract { pub struct Contracts; impl Contracts { - pub fn get_contract_addresses(contracts: &Vec) -> Vec { - contracts.into_iter().flat_map(|c| c.addresses.clone()).collect() - } - pub fn get_state_migrations( contracts: &Vec, ) -> Vec> { @@ -198,7 +192,7 @@ pub struct ContractAddress { chain_id: i32, pub next_block_number_to_ingest_from: i64, pub next_block_number_to_handle_from: i64, - start_block_number: i64, + pub start_block_number: i64, pub address: String, pub contract_name: String, } @@ -211,24 +205,3 @@ impl ContractAddress { serde_json::to_value(address).unwrap().as_str().unwrap().to_string() } } - -pub struct ContractAddresses; - -impl ContractAddresses { - pub fn group_by_addresses( - contract_addresses: &Vec, - ) -> HashMap { - contract_addresses.iter().fold( - HashMap::new(), - |mut contract_addresses_by_addresses, - contract_address @ UnsavedContractAddress { address, .. }| { - contract_addresses_by_addresses.insert( - Address::from_str(&*address.as_str()).unwrap(), - contract_address, - ); - - contract_addresses_by_addresses - }, - ) - } -} diff --git a/chaindexing/src/events.rs b/chaindexing/src/events.rs index d1c193d..d2adeeb 100644 --- a/chaindexing/src/events.rs +++ b/chaindexing/src/events.rs @@ -1,15 +1,16 @@ use std::collections::HashMap; +use std::hash::{Hash, Hasher}; -use crate::contracts::{ContractAddress, ContractAddresses, Contracts}; +use crate::contracts::{ContractAddress, Contracts}; use crate::diesels::schema::chaindexing_events; use diesel::{Insertable, Queryable}; use ethers::abi::{LogParam, Token}; -use ethers::types::Log; +use ethers::types::{Chain, Log}; use crate::{Contract, ContractEvent}; use uuid::Uuid; -#[derive(Debug, Clone, PartialEq, Queryable, Insertable)] +#[derive(Debug, Clone, Eq, Queryable, Insertable)] #[diesel(table_name = chaindexing_events)] pub struct Event { pub id: Uuid, @@ -29,14 +30,34 @@ pub struct Event { inserted_at: chrono::NaiveDateTime, } +impl PartialEq for Event { + fn eq(&self, other: &Self) -> bool { + self.chain_id == other.chain_id + && self.contract_address == other.contract_address + && self.abi == other.abi + && self.log_params == other.log_params + && self.block_hash == other.block_hash + } +} + +impl Hash for Event { + fn hash(&self, state: &mut H) { + self.chain_id.hash(state); + self.contract_address.hash(state); + self.abi.hash(state); + self.log_params.to_string().hash(state); + self.block_hash.hash(state); + } +} + impl Event { - pub fn new(log: &Log, event: &ContractEvent, contract_name: &str, chain_id: i32) -> Self { + pub fn new(log: &Log, event: &ContractEvent, contract_name: &str, chain: &Chain) -> Self { let log_params = event.value.parse_log(log.clone().into()).unwrap().params; let parameters = Self::log_params_to_parameters(&log_params); Self { id: uuid::Uuid::new_v4(), - chain_id, + chain_id: *chain as i32, contract_address: ContractAddress::address_to_string(&log.address), contract_name: contract_name.to_string(), abi: event.abi.clone(), @@ -77,14 +98,10 @@ impl Event { pub struct Events; impl Events { - pub fn new(logs: &Vec, contracts: &Vec) -> Vec { + pub fn new(logs: &Vec, contracts: &Vec, chain: &Chain) -> Vec { let events_by_topics = Contracts::group_events_by_topics(contracts); let contracts_by_addresses = Contracts::group_by_addresses(contracts); - let contract_addresses = Contracts::get_contract_addresses(contracts); - let contract_addresses_by_addresses = - ContractAddresses::group_by_addresses(&contract_addresses); - logs.iter() .map( |log @ Log { @@ -94,7 +111,7 @@ impl Events { log, &events_by_topics.get(&topics[0]).unwrap(), &contracts_by_addresses.get(&address).unwrap().name, - contract_addresses_by_addresses.get(&address).unwrap().chain_id, + chain, ) }, ) diff --git a/chaindexing/src/events_ingester.rs b/chaindexing/src/events_ingester.rs index 1e79e66..ce5bbf5 100644 --- a/chaindexing/src/events_ingester.rs +++ b/chaindexing/src/events_ingester.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -9,13 +9,18 @@ use ethers::types::{Address, Filter as EthersFilter, Log}; use futures_util::future::try_join_all; use futures_util::FutureExt; use futures_util::StreamExt; +use std::cmp::{max, min}; use tokio::sync::Mutex; use tokio::time::interval; +use crate::chain_reorg::Execution; use crate::contracts::Contract; use crate::contracts::{ContractEventTopic, Contracts}; -use crate::events::Events; -use crate::{ChaindexingRepo, ChaindexingRepoConn, Config, ContractAddress, Repo, Streamable}; +use crate::events::{Event, Events}; +use crate::{ + ChaindexingRepo, ChaindexingRepoConn, Config, ContractAddress, MinConfirmationCount, Repo, + RepoError, Streamable, +}; #[async_trait::async_trait] pub trait EventsIngesterJsonRpc: Clone + Sync + Send { @@ -37,6 +42,7 @@ impl EventsIngesterJsonRpc for Provider { #[derive(Debug)] pub enum EventsIngesterError { HTTPError(String), + RepoConnectionError, GenericError(String), } @@ -49,6 +55,15 @@ impl From for EventsIngesterError { } } +impl From for EventsIngesterError { + fn from(value: RepoError) -> Self { + match value { + RepoError::NotConnected => EventsIngesterError::RepoConnectionError, + RepoError::Unknown(error) => EventsIngesterError::GenericError(error), + } + } +} + #[derive(Clone)] pub struct EventsIngester; @@ -61,27 +76,42 @@ impl EventsIngester { let conn = Arc::new(Mutex::new(conn)); let contracts = config.contracts.clone(); let mut interval = interval(Duration::from_millis(config.ingestion_interval_ms)); + let mut run_confirmation_execution = false; loop { interval.tick().await; - for (_chain, json_rpc_url) in config.chains.clone() { + for (chain, json_rpc_url) in config.chains.clone() { let json_rpc = Arc::new(Provider::::try_from(json_rpc_url).unwrap()); - Self::ingest(conn.clone(), &contracts, config.blocks_per_batch, json_rpc) - .await - .unwrap(); + Self::ingest( + conn.clone(), + &contracts, + config.blocks_per_batch, + json_rpc, + &chain, + &config.min_confirmation_count, + run_confirmation_execution, + ) + .await + .unwrap(); + } + + if !run_confirmation_execution { + run_confirmation_execution = true; } } }); } - // TODO: Can Arc<> or just raw &mut Conn work? pub async fn ingest<'a>( conn: Arc>>, contracts: &Vec, blocks_per_batch: u64, json_rpc: Arc, + chain: &Chain, + min_confirmation_count: &MinConfirmationCount, + run_confirmation_execution: bool, ) -> Result<(), EventsIngesterError> { let current_block_number = (json_rpc.get_block_number().await?).as_u64(); @@ -93,18 +123,75 @@ impl EventsIngester { &contract_addresses, current_block_number, ); + let mut conn = conn.lock().await; - let filters = Filters::new( - &contract_addresses, - &contracts, + + MainExecution::run( + &mut conn, + contract_addresses.clone(), + contracts, + &json_rpc, + chain, current_block_number, blocks_per_batch, - ); - let logs = Self::fetch_logs(&filters, &json_rpc).await.unwrap(); + ) + .await?; + + if run_confirmation_execution { + ConfirmationExecution::run( + &mut conn, + contract_addresses.clone(), + contracts, + &json_rpc, + chain, + current_block_number, + blocks_per_batch, + min_confirmation_count, + ) + .await?; + } + } - let events = Events::new(&logs, &contracts); + Ok(()) + } - ChaindexingRepo::run_in_transaction(&mut conn, move |conn| { + fn filter_uningested_contract_addresses( + contract_addresses: &Vec, + current_block_number: u64, + ) -> Vec { + contract_addresses + .to_vec() + .into_iter() + .filter(|ca| current_block_number > ca.next_block_number_to_ingest_from as u64) + .collect() + } +} + +struct MainExecution; + +impl MainExecution { + async fn run<'a>( + conn: &mut ChaindexingRepoConn<'a>, + contract_addresses: Vec, + contracts: &Vec, + json_rpc: &Arc, + chain: &Chain, + current_block_number: u64, + blocks_per_batch: u64, + ) -> Result<(), EventsIngesterError> { + let filters = Filters::new( + &contract_addresses, + &contracts, + current_block_number, + blocks_per_batch, + &Execution::Main, + ); + + if !filters.is_empty() { + let logs = fetch_logs(&filters, json_rpc).await.unwrap(); + let events = Events::new(&logs, &contracts, chain); + + ChaindexingRepo::run_in_transaction(conn, move |conn| { async move { ChaindexingRepo::create_events(conn, &events.clone()).await; @@ -119,7 +206,7 @@ impl EventsIngester { } .boxed() }) - .await; + .await?; } Ok(()) @@ -148,29 +235,111 @@ impl EventsIngester { } } } +} - fn filter_uningested_contract_addresses( - contract_addresses: &Vec, +struct ConfirmationExecution; + +impl ConfirmationExecution { + async fn run<'a>( + conn: &mut ChaindexingRepoConn<'a>, + contract_addresses: Vec, + contracts: &Vec, + json_rpc: &Arc, + chain: &Chain, current_block_number: u64, - ) -> Vec { - contract_addresses - .to_vec() - .into_iter() - .filter(|ca| current_block_number > ca.next_block_number_to_ingest_from as u64) - .collect() + blocks_per_batch: u64, + min_confirmation_count: &MinConfirmationCount, + ) -> Result<(), EventsIngesterError> { + let filters = Filters::new( + &contract_addresses, + &contracts, + current_block_number, + blocks_per_batch, + &Execution::Confirmation(min_confirmation_count), + ); + + if !filters.is_empty() { + let logs = fetch_logs(&filters, json_rpc).await.unwrap(); + + if let Some((min_block_number, max_block_number)) = + Self::get_min_and_max_block_number(&logs) + { + let already_ingested_events = + ChaindexingRepo::get_events(conn, min_block_number, max_block_number).await; + + let json_rpc_events = Events::new(&logs, &contracts, chain); + + let (added_events, removed_events) = Self::get_json_rpc_added_and_removed_events( + &already_ingested_events, + &json_rpc_events, + ); + + ChaindexingRepo::run_in_transaction(conn, move |conn| { + async move { + let event_ids = removed_events.iter().map(|e| e.id).collect(); + ChaindexingRepo::delete_events_by_ids(conn, &event_ids).await; + + ChaindexingRepo::create_events(conn, &added_events).await; + + Ok(()) + } + .boxed() + }) + .await?; + } + } + + Ok(()) } - async fn fetch_logs( - filters: &Vec, - json_rpc: &Arc, - ) -> Result, EventsIngesterError> { - let logs_per_filter = - try_join_all(filters.iter().map(|f| json_rpc.get_logs(&f.value))).await?; + fn get_min_and_max_block_number(logs: &Vec) -> Option<(u64, u64)> { + logs.clone().iter().fold(None, |block_range, Log { block_number, .. }| { + let block_number = block_number.unwrap().as_u64(); - Ok(logs_per_filter.into_iter().flatten().collect()) + block_range + .and_then(|(min_block_number, max_block_number)| { + let min_block_number = min(min_block_number, block_number); + let max_block_number = max(max_block_number, block_number); + + Some((min_block_number, max_block_number)) + }) + .or_else(|| Some((block_number, block_number))) + }) + } + + fn get_json_rpc_added_and_removed_events( + already_ingested_events: &Vec, + json_rpc_events: &Vec, + ) -> (Vec, Vec) { + let already_ingested_events_set: HashSet<_> = + already_ingested_events.clone().into_iter().collect(); + let json_rpc_events_set: HashSet<_> = json_rpc_events.clone().into_iter().collect(); + + let json_rpc_added_events = json_rpc_events + .clone() + .into_iter() + .filter(|e| !already_ingested_events_set.contains(e)) + .collect(); + + let json_rpc_removed_events = already_ingested_events + .clone() + .into_iter() + .filter(|e| !json_rpc_events_set.contains(e)) + .collect(); + + (json_rpc_added_events, json_rpc_removed_events) } } +async fn fetch_logs( + filters: &Vec, + json_rpc: &Arc, +) -> Result, EventsIngesterError> { + let logs_per_filter = try_join_all(filters.iter().map(|f| json_rpc.get_logs(&f.value))).await?; + + Ok(logs_per_filter.into_iter().flatten().collect()) +} + #[derive(Clone, Debug)] struct Filter { contract_address_id: i32, @@ -183,20 +352,36 @@ impl Filter { topics: &Vec, current_block_number: u64, blocks_per_batch: u64, + execution: &Execution, ) -> Filter { - let next_block_number_to_ingest_from = - contract_address.next_block_number_to_ingest_from as u64; + let ContractAddress { + id: contract_address_id, + next_block_number_to_ingest_from, + start_block_number, + address, + .. + } = contract_address; + + let next_block_number_to_ingest_from = match execution { + Execution::Main => *next_block_number_to_ingest_from as u64, + Execution::Confirmation(min_confirmation_count) => min_confirmation_count + .ideduct_from(*next_block_number_to_ingest_from, *start_block_number), + }; + + let current_block_number = match execution { + Execution::Main => current_block_number, + Execution::Confirmation(min_confirmation_count) => { + min_confirmation_count.deduct_from(current_block_number, *start_block_number as u64) + } + }; Filter { - contract_address_id: contract_address.id, + contract_address_id: *contract_address_id, value: EthersFilter::new() - // We could use multiple adddresses here but - // we'll rather not because it would affect the correctness of - // next_block_number_to_ingest_from since we stream the contracts upstream. - .address(contract_address.address.parse::
().unwrap()) + .address(address.parse::
().unwrap()) .topic0(topics.to_vec()) .from_block(next_block_number_to_ingest_from) - .to_block(std::cmp::min( + .to_block(min( next_block_number_to_ingest_from + blocks_per_batch, current_block_number, )), @@ -212,6 +397,7 @@ impl Filters { contracts: &Vec, current_block_number: u64, blocks_per_batch: u64, + execution: &Execution, ) -> Vec { let topics_by_contract_name = Contracts::group_event_topics_by_names(contracts); @@ -226,8 +412,10 @@ impl Filters { topics_by_contract_name, current_block_number, blocks_per_batch, + execution, ) }) + .filter(|f| !f.value.get_from_block().eq(&f.value.get_to_block())) .collect() } diff --git a/chaindexing/src/lib.rs b/chaindexing/src/lib.rs index 12d7266..272d68e 100644 --- a/chaindexing/src/lib.rs +++ b/chaindexing/src/lib.rs @@ -1,3 +1,4 @@ +mod chain_reorg; mod chains; mod config; mod contract_states; @@ -9,6 +10,7 @@ mod events_ingester; mod repos; mod reset_counts; +pub use chain_reorg::MinConfirmationCount; pub use chains::Chains; pub use config::Config; pub use contract_states::{ContractState, ContractStateError, ContractStateMigrations}; @@ -83,11 +85,16 @@ impl Chaindexing { client: &ChaindexingRepoRawQueryClient, conn: &mut ChaindexingRepoConn<'a>, ) { + let reset_count = *reset_count as usize; let reset_counts = ChaindexingRepo::get_reset_counts(conn).await; - if *reset_count as usize > reset_counts.len() { + let previous_reset_count = reset_counts.len(); + + if reset_count > previous_reset_count { Self::reset_internal_migrations(client).await; Self::reset_migrations_for_contract_states(client, contracts).await; - ChaindexingRepo::create_reset_count(conn).await; + for _ in previous_reset_count..reset_count { + ChaindexingRepo::create_reset_count(conn).await; + } } } diff --git a/chaindexing/src/repos.rs b/chaindexing/src/repos.rs index c6fc589..9d3b954 100644 --- a/chaindexing/src/repos.rs +++ b/chaindexing/src/repos.rs @@ -6,6 +6,6 @@ pub use postgres_repo::{ PostgresRepoRawQueryClient, PostgresRepoRawQueryTxnClient, }; pub use repo::{ - ExecutesWithRawQuery, HasRawQueryClient, LoadsDataWithRawQuery, Migratable, Repo, + ExecutesWithRawQuery, HasRawQueryClient, LoadsDataWithRawQuery, Migratable, Repo, RepoError, RepoMigrations, SQLikeMigrations, Streamable, }; diff --git a/chaindexing/src/repos/postgres_repo.rs b/chaindexing/src/repos/postgres_repo.rs index 8601531..c6e3a64 100644 --- a/chaindexing/src/repos/postgres_repo.rs +++ b/chaindexing/src/repos/postgres_repo.rs @@ -10,13 +10,19 @@ use crate::{ }; use diesel_async::RunQueryDsl; -use diesel::{result::Error, upsert::excluded, ExpressionMethods}; +use diesel::{ + delete, + result::{DatabaseErrorKind, Error as DieselError}, + upsert::excluded, + ExpressionMethods, QueryDsl, +}; use diesel_async::{pooled_connection::AsyncDieselConnectionManager, AsyncPgConnection}; use diesel_streamer::get_serial_table_async_stream; use futures_core::{future::BoxFuture, Stream}; use tokio::sync::Mutex; +use uuid::Uuid; -use super::repo::Repo; +use super::repo::{Repo, RepoError}; pub type Conn<'a> = bb8::PooledConnection<'a, AsyncDieselConnectionManager>; pub type Pool = bb8::Pool>; @@ -28,6 +34,17 @@ pub use diesel_async::{ pub use raw_queries::{PostgresRepoRawQueryClient, PostgresRepoRawQueryTxnClient}; +impl From for RepoError { + fn from(value: DieselError) -> Self { + match value { + DieselError::DatabaseError(DatabaseErrorKind::ClosedConnection, _info) => { + RepoError::NotConnected + } + any_other_error => RepoError::Unknown(any_other_error.to_string()), + } + } +} + #[derive(Clone)] pub struct PostgresRepo { url: String, @@ -56,20 +73,17 @@ impl Repo for PostgresRepo { pool.get().await.unwrap() } - async fn run_in_transaction<'a, F>(conn: &mut Conn<'a>, repo_ops: F) + async fn run_in_transaction<'a, F>(conn: &mut Conn<'a>, repo_ops: F) -> Result<(), RepoError> where - F: for<'b> FnOnce(&'b mut Conn<'a>) -> BoxFuture<'b, Result<(), ()>> + Send + Sync + 'a, + F: for<'b> FnOnce(&'b mut Conn<'a>) -> BoxFuture<'b, Result<(), RepoError>> + + Send + + Sync + + 'a, { - conn.transaction::<(), Error, _>(|transaction_conn| { - async move { - (repo_ops)(transaction_conn).await.unwrap(); - - Ok(()) - } - .scope_boxed() + conn.transaction::<(), RepoError, _>(|transaction_conn| { + async move { (repo_ops)(transaction_conn).await }.scope_boxed() }) .await - .unwrap(); } async fn create_contract_addresses<'a>( @@ -105,12 +119,25 @@ impl Repo for PostgresRepo { .await .unwrap(); } - async fn get_all_events<'a>(conn: &mut Conn<'a>) -> Vec { use crate::diesels::schema::chaindexing_events::dsl::*; chaindexing_events.load(conn).await.unwrap() } + async fn get_events<'a>(conn: &mut Self::Conn<'a>, from: u64, to: u64) -> Vec { + use crate::diesels::schema::chaindexing_events::dsl::*; + + chaindexing_events + .filter(block_number.between(from as i64, to as i64)) + .load(conn) + .await + .unwrap() + } + async fn delete_events_by_ids<'a>(conn: &mut Self::Conn<'a>, ids: &Vec) { + use crate::diesels::schema::chaindexing_events::dsl::*; + + delete(chaindexing_events).filter(id.eq_any(ids)).execute(conn).await.unwrap(); + } async fn update_next_block_number_to_ingest_from<'a>( conn: &mut Self::Conn<'a>, diff --git a/chaindexing/src/repos/repo.rs b/chaindexing/src/repos/repo.rs index 46de76c..e340895 100644 --- a/chaindexing/src/repos/repo.rs +++ b/chaindexing/src/repos/repo.rs @@ -1,4 +1,6 @@ +use derive_more::Display; use std::sync::Arc; +use uuid::Uuid; use futures_core::{future::BoxFuture, Stream}; use serde::de::DeserializeOwned; @@ -10,6 +12,12 @@ use crate::{ ContractAddress, ResetCount, }; +#[derive(Debug, Display)] +pub enum RepoError { + NotConnected, + Unknown(String), +} + #[async_trait::async_trait] pub trait Repo: Sync + Send + Migratable + Streamable + ExecutesWithRawQuery + LoadsDataWithRawQuery + Clone @@ -21,9 +29,12 @@ pub trait Repo: async fn get_pool(&self, max_size: u32) -> Self::Pool; async fn get_conn<'a>(pool: &'a Self::Pool) -> Self::Conn<'a>; - async fn run_in_transaction<'a, F>(conn: &mut Self::Conn<'a>, repo_ops: F) + async fn run_in_transaction<'a, F>( + conn: &mut Self::Conn<'a>, + repo_ops: F, + ) -> Result<(), RepoError> where - F: for<'b> FnOnce(&'b mut Self::Conn<'a>) -> BoxFuture<'b, Result<(), ()>> + F: for<'b> FnOnce(&'b mut Self::Conn<'a>) -> BoxFuture<'b, Result<(), RepoError>> + Send + Sync + 'a; @@ -36,6 +47,8 @@ pub trait Repo: async fn create_events<'a>(conn: &mut Self::Conn<'a>, events: &Vec); async fn get_all_events<'a>(conn: &mut Self::Conn<'a>) -> Vec; + async fn get_events<'a>(conn: &mut Self::Conn<'a>, from: u64, to: u64) -> Vec; + async fn delete_events_by_ids<'a>(conn: &mut Self::Conn<'a>, ids: &Vec); async fn update_next_block_number_to_ingest_from<'a>( conn: &mut Self::Conn<'a>,