Skip to content

Commit

Permalink
Merge pull request #20 from Jurshsmith/handle-chain-reorg-when-ingest…
Browse files Browse the repository at this point in the history
…ing-events

Handle chain re-org for EventsIngester
  • Loading branch information
Jurshsmith authored Oct 1, 2023
2 parents 8dde04a + 914790d commit c1d3fd2
Show file tree
Hide file tree
Showing 11 changed files with 412 additions and 115 deletions.
69 changes: 59 additions & 10 deletions chaindexing-tests/src/tests/events_ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ mod tests {
use crate::{
json_rpc_with_empty_logs, json_rpc_with_filter_stubber, json_rpc_with_logs, test_runner,
};
use chaindexing::{Chaindexing, EventsIngester, PostgresRepo, Repo};
use chaindexing::{
Chain, Chaindexing, EventsIngester, MinConfirmationCount, PostgresRepo, Repo,
};

#[tokio::test]
pub async fn creates_contract_events() {
Expand All @@ -28,7 +30,17 @@ mod tests {
Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await;

let conn = Arc::new(Mutex::new(conn));
EventsIngester::ingest(conn.clone(), &contracts, 10, json_rpc).await.unwrap();
EventsIngester::ingest(
conn.clone(),
&contracts,
10,
json_rpc,
&Chain::Mainnet,
&MinConfirmationCount::new(1),
false,
)
.await
.unwrap();

let mut conn = conn.lock().await;
let ingested_events = PostgresRepo::get_all_events(&mut conn).await;
Expand Down Expand Up @@ -66,7 +78,17 @@ mod tests {
));

let conn = Arc::new(Mutex::new(conn));
EventsIngester::ingest(conn, &contracts, 10, json_rpc).await.unwrap();
EventsIngester::ingest(
conn,
&contracts,
10,
json_rpc,
&Chain::Mainnet,
&MinConfirmationCount::new(1),
false,
)
.await
.unwrap();
})
.await;
}
Expand All @@ -87,9 +109,18 @@ mod tests {

let conn = Arc::new(Mutex::new(conn));
let blocks_per_batch = 10;
EventsIngester::ingest(conn.clone(), &contracts, blocks_per_batch, json_rpc)
.await
.unwrap();

EventsIngester::ingest(
conn.clone(),
&contracts,
blocks_per_batch,
json_rpc,
&Chain::Mainnet,
&MinConfirmationCount::new(1),
false,
)
.await
.unwrap();

let mut conn = conn.lock().await;
let contract_addresses = PostgresRepo::get_all_contract_addresses(&mut conn).await;
Expand Down Expand Up @@ -117,9 +148,17 @@ mod tests {
let json_rpc = Arc::new(empty_json_rpc());
let blocks_per_batch = 10;
let conn = Arc::new(Mutex::new(conn));
EventsIngester::ingest(conn.clone(), &contracts, blocks_per_batch, json_rpc)
.await
.unwrap();
EventsIngester::ingest(
conn.clone(),
&contracts,
blocks_per_batch,
json_rpc,
&Chain::Mainnet,
&MinConfirmationCount::new(1),
false,
)
.await
.unwrap();
let mut conn = conn.lock().await;
assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty());
})
Expand All @@ -138,7 +177,17 @@ mod tests {
Chaindexing::create_initial_contract_addresses(&mut conn, &contracts).await;

let conn = Arc::new(Mutex::new(conn));
EventsIngester::ingest(conn.clone(), &contracts, 10, json_rpc).await.unwrap();
EventsIngester::ingest(
conn.clone(),
&contracts,
10,
json_rpc,
&Chain::Mainnet,
&MinConfirmationCount::new(1),
false,
)
.await
.unwrap();

let mut conn = conn.lock().await;
assert!(PostgresRepo::get_all_events(&mut conn).await.is_empty());
Expand Down
26 changes: 26 additions & 0 deletions chaindexing/src/chain_reorg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::cmp::max;

/// Tolerance for chain re-organization
#[derive(Clone)]
pub struct MinConfirmationCount {
value: u8,
}

impl MinConfirmationCount {
pub fn new(value: u8) -> Self {
Self { value }
}

pub fn ideduct_from(&self, block_number: i64, start_block_number: i64) -> u64 {
self.deduct_from(block_number as u64, start_block_number as u64)
}

pub fn deduct_from(&self, block_number: u64, start_block_number: u64) -> u64 {
max(start_block_number, block_number - (self.value as u64))
}
}

pub enum Execution<'a> {
Main,
Confirmation(&'a MinConfirmationCount),
}
9 changes: 4 additions & 5 deletions chaindexing/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use crate::{ChaindexingRepo, Chains, Contract};
use crate::{ChaindexingRepo, Chains, Contract, MinConfirmationCount};

#[derive(Clone)]
pub struct Config {
pub chains: Chains,
pub repo: ChaindexingRepo,
pub contracts: Vec<Contract>,
/// Tolerance for chain re-organization
pub min_confirmation_count: u8,
pub min_confirmation_count: MinConfirmationCount,
pub blocks_per_batch: u64,
pub handler_interval_ms: u64,
pub ingestion_interval_ms: u64,
Expand All @@ -19,7 +18,7 @@ impl Config {
repo,
chains,
contracts: vec![],
min_confirmation_count: 40,
min_confirmation_count: MinConfirmationCount::new(40),
blocks_per_batch: 20,
handler_interval_ms: 10000,
ingestion_interval_ms: 10000,
Expand All @@ -40,7 +39,7 @@ impl Config {
}

pub fn with_min_confirmation_count(mut self, min_confirmation_count: u8) -> Self {
self.min_confirmation_count = min_confirmation_count;
self.min_confirmation_count = MinConfirmationCount::new(min_confirmation_count);

self
}
Expand Down
2 changes: 0 additions & 2 deletions chaindexing/src/contract_states/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ pub trait ContractStateMigrations: Send + Sync {
validate_migration(user_migration);

if user_migration.starts_with("CREATE TABLE IF NOT EXISTS") {
// FUTURE TODO: Index user migration fields
// Index State Version fields
let create_state_table_migration = user_migration.to_owned().to_owned();

let create_state_versions_table_migration = append_migration(
Expand Down
33 changes: 3 additions & 30 deletions chaindexing/src/contracts.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::{collections::HashMap, str::FromStr, sync::Arc};

use crate::{
diesels::schema::chaindexing_contract_addresses, event_handlers::EventHandler,
ContractStateMigrations,
};
use crate::diesels::schema::chaindexing_contract_addresses;
use crate::{ContractStateMigrations, EventHandler};
use diesel::{Identifiable, Insertable, Queryable};

use ethers::{
Expand Down Expand Up @@ -100,10 +98,6 @@ impl Contract {
pub struct Contracts;

impl Contracts {
pub fn get_contract_addresses(contracts: &Vec<Contract>) -> Vec<UnsavedContractAddress> {
contracts.into_iter().flat_map(|c| c.addresses.clone()).collect()
}

pub fn get_state_migrations(
contracts: &Vec<Contract>,
) -> Vec<Arc<dyn ContractStateMigrations>> {
Expand Down Expand Up @@ -198,7 +192,7 @@ pub struct ContractAddress {
chain_id: i32,
pub next_block_number_to_ingest_from: i64,
pub next_block_number_to_handle_from: i64,
start_block_number: i64,
pub start_block_number: i64,
pub address: String,
pub contract_name: String,
}
Expand All @@ -211,24 +205,3 @@ impl ContractAddress {
serde_json::to_value(address).unwrap().as_str().unwrap().to_string()
}
}

pub struct ContractAddresses;

impl ContractAddresses {
pub fn group_by_addresses(
contract_addresses: &Vec<UnsavedContractAddress>,
) -> HashMap<Address, &UnsavedContractAddress> {
contract_addresses.iter().fold(
HashMap::new(),
|mut contract_addresses_by_addresses,
contract_address @ UnsavedContractAddress { address, .. }| {
contract_addresses_by_addresses.insert(
Address::from_str(&*address.as_str()).unwrap(),
contract_address,
);

contract_addresses_by_addresses
},
)
}
}
39 changes: 28 additions & 11 deletions chaindexing/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::collections::HashMap;
use std::hash::{Hash, Hasher};

use crate::contracts::{ContractAddress, ContractAddresses, Contracts};
use crate::contracts::{ContractAddress, Contracts};
use crate::diesels::schema::chaindexing_events;
use diesel::{Insertable, Queryable};
use ethers::abi::{LogParam, Token};
use ethers::types::Log;
use ethers::types::{Chain, Log};

use crate::{Contract, ContractEvent};
use uuid::Uuid;

#[derive(Debug, Clone, PartialEq, Queryable, Insertable)]
#[derive(Debug, Clone, Eq, Queryable, Insertable)]
#[diesel(table_name = chaindexing_events)]
pub struct Event {
pub id: Uuid,
Expand All @@ -29,14 +30,34 @@ pub struct Event {
inserted_at: chrono::NaiveDateTime,
}

impl PartialEq for Event {
fn eq(&self, other: &Self) -> bool {
self.chain_id == other.chain_id
&& self.contract_address == other.contract_address
&& self.abi == other.abi
&& self.log_params == other.log_params
&& self.block_hash == other.block_hash
}
}

impl Hash for Event {
fn hash<H: Hasher>(&self, state: &mut H) {
self.chain_id.hash(state);
self.contract_address.hash(state);
self.abi.hash(state);
self.log_params.to_string().hash(state);
self.block_hash.hash(state);
}
}

impl Event {
pub fn new(log: &Log, event: &ContractEvent, contract_name: &str, chain_id: i32) -> Self {
pub fn new(log: &Log, event: &ContractEvent, contract_name: &str, chain: &Chain) -> Self {
let log_params = event.value.parse_log(log.clone().into()).unwrap().params;
let parameters = Self::log_params_to_parameters(&log_params);

Self {
id: uuid::Uuid::new_v4(),
chain_id,
chain_id: *chain as i32,
contract_address: ContractAddress::address_to_string(&log.address),
contract_name: contract_name.to_string(),
abi: event.abi.clone(),
Expand Down Expand Up @@ -77,14 +98,10 @@ impl Event {
pub struct Events;

impl Events {
pub fn new(logs: &Vec<Log>, contracts: &Vec<Contract>) -> Vec<Event> {
pub fn new(logs: &Vec<Log>, contracts: &Vec<Contract>, chain: &Chain) -> Vec<Event> {
let events_by_topics = Contracts::group_events_by_topics(contracts);
let contracts_by_addresses = Contracts::group_by_addresses(contracts);

let contract_addresses = Contracts::get_contract_addresses(contracts);
let contract_addresses_by_addresses =
ContractAddresses::group_by_addresses(&contract_addresses);

logs.iter()
.map(
|log @ Log {
Expand All @@ -94,7 +111,7 @@ impl Events {
log,
&events_by_topics.get(&topics[0]).unwrap(),
&contracts_by_addresses.get(&address).unwrap().name,
contract_addresses_by_addresses.get(&address).unwrap().chain_id,
chain,
)
},
)
Expand Down
Loading

0 comments on commit c1d3fd2

Please sign in to comment.