Skip to content

Commit

Permalink
Merge pull request #18 from Jurshsmith/allow-resetting-to-index-from-…
Browse files Browse the repository at this point in the history
…scratch

Allow resets
  • Loading branch information
Jurshsmith authored Oct 1, 2023
2 parents d589e72 + 56c48ec commit b4fe3f4
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 31 deletions.
8 changes: 8 additions & 0 deletions chaindexing/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub struct Config {
pub chains: Chains,
pub repo: ChaindexingRepo,
pub contracts: Vec<Contract>,
pub reset_count: u8,
pub blocks_per_batch: u64,
pub handler_interval_ms: u64,
pub ingestion_interval_ms: u64,
Expand All @@ -16,6 +17,7 @@ impl Config {
repo,
chains,
contracts: vec![],
reset_count: 0,
blocks_per_batch: 20,
handler_interval_ms: 10000,
ingestion_interval_ms: 10000,
Expand All @@ -28,6 +30,12 @@ impl Config {
self
}

pub fn reset(mut self, count: u8) -> Self {
self.reset_count = count;

self
}

pub fn with_blocks_per_batch(&self, blocks_per_batch: u64) -> Self {
Self {
blocks_per_batch,
Expand Down
32 changes: 23 additions & 9 deletions chaindexing/src/contract_states/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ pub trait ContractStateMigrations: Send + Sync {
// Index State Version fields
let create_state_table_migration = user_migration.to_owned().to_owned();

let create_state_versions_table_migration =
append_migration(&user_migration, &remaining_state_versions_migration());
let create_state_versions_table_migration = append_migration(
&user_migration,
&get_remaining_state_versions_migration(),
);
let create_state_versions_table_migration =
set_state_versions_table_name(&create_state_versions_table_migration);

Expand All @@ -27,8 +29,8 @@ pub trait ContractStateMigrations: Send + Sync {
extract_table_fields(&create_state_versions_table_migration);

let state_versions_unique_index_migration =
unique_index_migration_for_state_versions(
state_versions_table_name,
get_unique_index_migration_for_state_versions(
&state_versions_table_name,
state_versions_fields,
);

Expand All @@ -43,6 +45,18 @@ pub trait ContractStateMigrations: Send + Sync {
})
.collect()
}

fn get_reset_migrations(&self) -> Vec<String> {
self.get_migrations()
.iter()
.filter(|m| m.starts_with("CREATE TABLE IF NOT EXISTS"))
.map(|create_migration| {
let table_name = extract_table_name(&create_migration);

format!("DROP TABLE IF EXISTS {table_name}")
})
.collect()
}
}

fn extract_table_name(migration: &str) -> String {
Expand Down Expand Up @@ -75,8 +89,8 @@ fn extract_table_fields(migration: &str) -> Vec<String> {
.collect()
}

fn unique_index_migration_for_state_versions(
table_name: String,
fn get_unique_index_migration_for_state_versions(
table_name: &str,
table_fields: Vec<String>,
) -> String {
let table_fields: Vec<String> =
Expand Down Expand Up @@ -110,7 +124,7 @@ fn append_migration(migration: &str, migration_to_append: &str) -> String {
.replace("),,", ",")
}

fn remaining_state_versions_migration() -> String {
fn get_remaining_state_versions_migration() -> String {
// No need for IDs, we will be using each field in the state
// and delegate the responsibility of creating unique states
// to the user, using whatever means necessary. The user can
Expand All @@ -120,11 +134,11 @@ fn remaining_state_versions_migration() -> String {
state_version_is_deleted BOOL NOT NULL default false,
{}
",
remaining_state_fields_migration()
get_remaining_state_fields_migration()
)
}

fn remaining_state_fields_migration() -> String {
fn get_remaining_state_fields_migration() -> String {
"state_version_contract_address TEXT NOT NULL,
state_chain_id INTEGER NOT NULL,
state_version_block_hash TEXT NOT NULL,
Expand Down
6 changes: 6 additions & 0 deletions chaindexing/src/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ impl Contracts {
contracts.into_iter().flat_map(|c| c.addresses.clone()).collect()
}

pub fn get_state_migrations(
contracts: &Vec<Contract>,
) -> Vec<Arc<dyn ContractStateMigrations>> {
contracts.into_iter().flat_map(|c| c.state_migrations.clone()).collect()
}

pub fn get_all_event_handlers_by_event_abi(
contracts: &Vec<Contract>,
) -> HashMap<EventAbi, Arc<dyn EventHandler>> {
Expand Down
7 changes: 7 additions & 0 deletions chaindexing/src/diesels/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ diesel::table! {
}
}

diesel::table! {
chaindexing_reset_counts (id) {
id -> Int4,
inserted_at -> Timestamptz,
}
}

diesel::allow_tables_to_appear_in_same_query!(chaindexing_contract_addresses, chaindexing_events,);
1 change: 0 additions & 1 deletion chaindexing/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use ethers::types::Log;
use crate::{Contract, ContractEvent};
use uuid::Uuid;

/// Ingested EVM Events
#[derive(Debug, Clone, PartialEq, Queryable, Insertable)]
#[diesel(table_name = chaindexing_events)]
pub struct Event {
Expand Down
66 changes: 52 additions & 14 deletions chaindexing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ mod event_handlers;
mod events;
mod events_ingester;
mod repos;
mod reset_counts;

pub use chains::Chains;
pub use config::Config;
pub use contract_states::{ContractState, ContractStateError, ContractStateMigrations};
use contracts::Contracts;
pub use contracts::{Contract, ContractAddress, ContractEvent};
pub use diesel;
pub use diesel::prelude::QueryableByName;
Expand All @@ -19,6 +21,7 @@ pub use event_handlers::{EventHandler, EventHandlers};
pub use events::Event;
pub use events_ingester::{EventsIngester, EventsIngesterJsonRpc};
pub use repos::*;
pub use reset_counts::ResetCount;

pub use ethers::prelude::{Address, U256, U64};

Expand Down Expand Up @@ -54,33 +57,68 @@ impl Chaindexing {
}

pub async fn setup(config: &Config) -> Result<(), ()> {
let raw_query_client = config.repo.get_raw_query_client().await;
Self::run_internal_migrations(&raw_query_client).await;
Self::run_migrations_for_contract_states(&raw_query_client, &config.contracts).await;

let pool = config.repo.get_pool(1).await;
let Config {
repo,
contracts,
reset_count,
..
} = config;

let client = repo.get_raw_query_client().await;
let pool = repo.get_pool(1).await;
let mut conn = ChaindexingRepo::get_conn(&pool).await;
Self::create_initial_contract_addresses(&mut conn, &config.contracts).await;

Self::run_migrations_for_resets(&client).await;
Self::maybe_reset(reset_count, contracts, &client, &mut conn).await;
Self::run_internal_migrations(&client).await;
Self::run_migrations_for_contract_states(&client, contracts).await;
Self::create_initial_contract_addresses(&mut conn, contracts).await;

Ok(())
}

pub async fn run_internal_migrations(raw_query_client: &ChaindexingRepoRawQueryClient) {
pub async fn maybe_reset<'a>(
reset_count: &u8,
contracts: &Vec<Contract>,
client: &ChaindexingRepoRawQueryClient,
conn: &mut ChaindexingRepoConn<'a>,
) {
let reset_counts = ChaindexingRepo::get_reset_counts(conn).await;
if *reset_count as usize > reset_counts.len() {
Self::reset_internal_migrations(client).await;
Self::reset_migrations_for_contract_states(client, contracts).await;
ChaindexingRepo::create_reset_count(conn).await;
}
}

pub async fn run_migrations_for_resets(client: &ChaindexingRepoRawQueryClient) {
ChaindexingRepo::migrate(
&raw_query_client,
ChaindexingRepo::get_all_internal_migrations(),
&client,
ChaindexingRepo::create_reset_counts_migration().to_vec(),
)
.await;
}
pub async fn run_internal_migrations(client: &ChaindexingRepoRawQueryClient) {
ChaindexingRepo::migrate(&client, ChaindexingRepo::get_internal_migrations()).await;
}
pub async fn reset_internal_migrations(client: &ChaindexingRepoRawQueryClient) {
ChaindexingRepo::migrate(&client, ChaindexingRepo::get_reset_internal_migrations()).await;
}

pub async fn run_migrations_for_contract_states(
raw_query_client: &ChaindexingRepoRawQueryClient,
client: &ChaindexingRepoRawQueryClient,
contracts: &Vec<Contract>,
) {
let states = contracts.iter().flat_map(|c| c.state_migrations.clone());

for state in states {
ChaindexingRepo::migrate(raw_query_client, state.get_migrations()).await;
for state_migration in Contracts::get_state_migrations(contracts) {
ChaindexingRepo::migrate(client, state_migration.get_migrations()).await;
}
}
pub async fn reset_migrations_for_contract_states(
client: &ChaindexingRepoRawQueryClient,
contracts: &Vec<Contract>,
) {
for state_migration in Contracts::get_state_migrations(contracts) {
ChaindexingRepo::migrate(client, state_migration.get_reset_migrations()).await;
}
}

Expand Down
25 changes: 24 additions & 1 deletion chaindexing/src/repos/postgres_repo.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use derive_more::Display;
use std::sync::Arc;

mod migrations;
Expand All @@ -6,7 +7,7 @@ mod raw_queries;
use crate::{
contracts::{ContractAddress, ContractAddressID, UnsavedContractAddress},
events::Event,
Streamable,
ResetCount, Streamable,
};
use diesel_async::RunQueryDsl;

Expand All @@ -28,6 +29,11 @@ pub use diesel_async::{

pub use raw_queries::{PostgresRepoRawQueryClient, PostgresRepoRawQueryTxnClient};

#[derive(Clone, Debug, Display)]
pub enum PostgresRepoError {
RepoNotMigrated,
}

#[derive(Clone)]
pub struct PostgresRepo {
url: String,
Expand All @@ -39,6 +45,7 @@ type PgPooledConn<'a> = bb8::PooledConnection<'a, AsyncDieselConnectionManager<A
impl Repo for PostgresRepo {
type Conn<'a> = PgPooledConn<'a>;
type Pool = bb8::Pool<AsyncDieselConnectionManager<AsyncPgConnection>>;
type RepoError = PostgresRepoError;

fn new(url: &str) -> Self {
Self {
Expand Down Expand Up @@ -141,6 +148,22 @@ impl Repo for PostgresRepo {
.await
.unwrap();
}

async fn create_reset_count<'a>(conn: &mut Self::Conn<'a>) {
use crate::diesels::schema::chaindexing_reset_counts::dsl::*;

diesel::insert_into(chaindexing_reset_counts)
.default_values()
.execute(conn)
.await
.unwrap();
}

async fn get_reset_counts<'a>(conn: &mut Self::Conn<'a>) -> Vec<ResetCount> {
use crate::diesels::schema::chaindexing_reset_counts::dsl::*;

chaindexing_reset_counts.load(conn).await.unwrap()
}
}

impl Streamable for PostgresRepo {
Expand Down
10 changes: 10 additions & 0 deletions chaindexing/src/repos/postgres_repo/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@ impl RepoMigrations for PostgresRepo {
fn create_contract_addresses_migration() -> &'static [&'static str] {
SQLikeMigrations::create_contract_addresses()
}
fn drop_contract_addresses_migration() -> &'static [&'static str] {
SQLikeMigrations::drop_contract_addresses()
}

fn create_events_migration() -> &'static [&'static str] {
SQLikeMigrations::create_events()
}
fn drop_events_migration() -> &'static [&'static str] {
SQLikeMigrations::drop_events()
}

fn create_reset_counts_migration() -> &'static [&'static str] {
SQLikeMigrations::create_reset_counts()
}
}

impl Migratable for PostgresRepo {}
Loading

0 comments on commit b4fe3f4

Please sign in to comment.