Skip to content

Commit

Permalink
Drop Reth provider asap. Fixes #18 (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
naps62 authored Dec 15, 2023
1 parent 2d8fc2d commit fe2641a
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 31 deletions.
4 changes: 2 additions & 2 deletions benches/usdc_holders_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use diesel::{
sql_types::{Array, Bytea, Integer},
RunQueryDsl,
};
use iron_indexer::sync::RethProvider;
use iron_indexer::sync::RethProviderFactory;
use iron_indexer::{
config::Config,
db::{types::Address, Db},
Expand Down Expand Up @@ -56,7 +56,7 @@ async fn run(config: Config) -> Result<()> {
let db = Db::connect(&config, account_tx, job_tx).await?;
let chain = db.setup_chain(&config.chain).await?;

let provider_factory = Arc::new(RethProvider::new(&config, &chain)?);
let provider_factory = Arc::new(RethProviderFactory::new(&config, &chain)?);
let backfill = BackfillManager::new(
db.clone(),
&config,
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};

use config::Config;

use crate::sync::{RethProvider, StopStrategy};
use crate::sync::{RethProviderFactory, StopStrategy};

use self::db::Db;
use self::sync::{BackfillManager, Forward, SyncJob};
Expand All @@ -30,7 +30,7 @@ async fn main() -> Result<()> {
let (job_tx, job_rx) = mpsc::unbounded_channel();
let db = Db::connect(&config, account_tx, job_tx).await?;
let chain = db.setup_chain(&config.chain).await?;
let provider_factory = Arc::new(RethProvider::new(&config, &chain)?);
let provider_factory = Arc::new(RethProviderFactory::new(&config, &chain)?);
let token = CancellationToken::new();

// setup each task
Expand Down
11 changes: 6 additions & 5 deletions src/sync/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
db::{models::BackfillJobWithId, Db},
};

use super::{RethProvider, SyncJob, Worker};
use super::{RethProviderFactory, SyncJob, Worker};

#[derive(Debug)]
pub enum StopStrategy {
Expand Down Expand Up @@ -45,14 +45,14 @@ pub struct BackfillManager {
jobs_rcv: UnboundedReceiver<()>,
config: Arc<RwLock<Config>>,
stop: StopStrategy,
provider_factory: Arc<RethProvider>,
provider_factory: Arc<RethProviderFactory>,
}

impl BackfillManager {
pub fn new(
db: Db,
config: &Config,
provider_factory: Arc<RethProvider>,
provider_factory: Arc<RethProviderFactory>,
jobs_rcv: UnboundedReceiver<()>,
stop: StopStrategy,
) -> Self {
Expand Down Expand Up @@ -145,12 +145,13 @@ impl SyncJob for Worker<Backfill> {
#[instrument(skip(self), fields(chain_id = self.chain.chain_id))]
async fn run(mut self) -> Result<()> {
for block in (self.inner.low..self.inner.high).rev() {
let provider = self.provider_factory.get()?;
// start by checking shutdown signal
if self.cancellation_token.is_cancelled() {
break;
}

let header = self.provider.header_by_number(block)?.unwrap();
let header = provider.header_by_number(block)?.unwrap();
self.process_block(&header).await?;
self.maybe_flush(block).await?;
}
Expand Down Expand Up @@ -189,7 +190,7 @@ impl Backfill {
db: Db,
config: Arc<RwLock<Config>>,
job: BackfillJobWithId,
provider_factory: Arc<RethProvider>,
provider_factory: Arc<RethProviderFactory>,
cancellation_token: CancellationToken,
) -> Result<Worker<Self>> {
let config = config.read().await;
Expand Down
7 changes: 4 additions & 3 deletions src/sync/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::{info, instrument};
use crate::db::models::Chain;
use crate::{config::Config, db::Db};

use super::{RethProvider, SyncJob, Worker};
use super::{RethProviderFactory, SyncJob, Worker};

/// Main sync job
/// Walks the blockchain forward, from a pre-configured starting block.
Expand Down Expand Up @@ -39,7 +39,8 @@ impl SyncJob for Worker<Forward> {

self.process_new_accounts().await?;

match self.provider.header_by_number(self.inner.next_block)? {
let provider = self.provider_factory.get()?;
match provider.header_by_number(self.inner.next_block)? {
// got a block. process it, only flush if needed
Some(header) => {
self.process_block(&header).await?;
Expand Down Expand Up @@ -111,7 +112,7 @@ impl Forward {
db: Db,
config: &Config,
chain: Chain,
provider_factory: Arc<RethProvider>,
provider_factory: Arc<RethProviderFactory>,
accounts_rcv: UnboundedReceiver<Address>,
cancellation_token: CancellationToken,
) -> Result<Worker<Self>> {
Expand Down
29 changes: 12 additions & 17 deletions src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@ use alloy_primitives::{Address, B256};
use async_trait::async_trait;
use color_eyre::eyre::{eyre, Result};
use rand::{rngs::StdRng, SeedableRng};
use reth_db::mdbx::tx::Tx;
use reth_db::mdbx::RO;
use reth_primitives::Header;
use reth_provider::{
BlockNumReader, BlockReader, DatabaseProvider, ReceiptProvider, TransactionsProvider,
};
use reth_provider::{BlockNumReader, BlockReader, ReceiptProvider, TransactionsProvider};
use scalable_cuckoo_filter::{DefaultHasher, ScalableCuckooFilter, ScalableCuckooFilterBuilder};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
Expand All @@ -34,15 +30,14 @@ use crate::{

pub use backfill::{BackfillManager, StopStrategy};
pub use forward::Forward;
pub use provider::RethProvider;
pub use provider::RethProviderFactory;

/// Generic sync job state
#[derive(Debug)]
pub struct Worker<T: std::fmt::Debug> {
inner: T,

provider: DatabaseProvider<Tx<RO>>,
provider_factory: Arc<RethProvider>,
provider_factory: Arc<RethProviderFactory>,

/// DB handle
db: Db,
Expand Down Expand Up @@ -85,11 +80,9 @@ impl<T: std::fmt::Debug> Worker<T> {
db: Db,
config: &Config,
chain: Chain,
provider_factory: Arc<RethProvider>,
provider_factory: Arc<RethProviderFactory>,
cancellation_token: CancellationToken,
) -> Result<Self> {
let provider = provider_factory.get()?;

let addresses: BTreeSet<_> = db.get_addresses().await?.into_iter().map(|a| a.0).collect();
let mut cuckoo = ScalableCuckooFilterBuilder::new()
.initial_capacity(addresses.len())
Expand All @@ -102,7 +95,6 @@ impl<T: std::fmt::Debug> Worker<T> {

Ok(Self {
inner,
provider,
provider_factory,
db,
chain,
Expand All @@ -129,32 +121,35 @@ impl<T: std::fmt::Debug> Worker<T> {
async fn wait_new_block(&mut self, block: u64) -> Result<()> {
trace!(event = "wait", block);
loop {
self.provider = self.provider_factory.get()?;
let provider = self.provider_factory.get()?;

let latest = self.provider.last_block_number().unwrap();
let latest = provider.last_block_number().unwrap();

if latest >= block {
trace!("new block(s) found. from: {}, latest: {}", block, latest);
return Ok(());
}

drop(provider);

sleep(Duration::from_secs(2)).await;
}
}

async fn process_block(&mut self, header: &Header) -> Result<()> {
let indices = match self.provider.block_body_indices(header.number)? {
let provider = self.provider_factory.get()?;
let indices = match provider.block_body_indices(header.number)? {
Some(indices) => indices,
None => return Err(eyre!("err")),
};

for tx_id in indices.first_tx_num..indices.first_tx_num + indices.tx_count {
let tx = match self.provider.transaction_by_id_no_hash(tx_id)? {
let tx = match provider.transaction_by_id_no_hash(tx_id)? {
Some(tx) => tx,
None => continue,
};

let receipt = match self.provider.receipt(tx_id)? {
let receipt = match provider.receipt(tx_id)? {
Some(receipt) => receipt,
None => continue,
};
Expand Down
4 changes: 2 additions & 2 deletions src/sync/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ use crate::{config::Config, db::models::Chain};
/// While the indexer is heavily coupled to this particular provider,
/// it still benefits from abstracting it so it can be swapped out for testing purposes
#[derive(Debug)]
pub struct RethProvider {
pub struct RethProviderFactory {
/// Reth Provider factory
factory: ProviderFactory<DatabaseEnv>,
}

impl RethProvider {
impl RethProviderFactory {
/// Creates a new Reth DB provider
pub fn new(config: &Config, chain: &Chain) -> Result<Self> {
let chain_id = chain.chain_id as u64;
Expand Down

0 comments on commit fe2641a

Please sign in to comment.