Skip to content

Commit

Permalink
feat: load block handler config dynamically at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
luis-herasme committed Jul 26, 2024
1 parent 97774b9 commit 6128230
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 64 deletions.
4 changes: 2 additions & 2 deletions ghost-crab-common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub struct DataSource {

#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct BlockHandler {
pub struct BlockHandlerConfig {
pub start_block: u64,
pub network: String,
pub execution_mode: Option<ExecutionMode>,
Expand All @@ -52,7 +52,7 @@ pub struct Config {
pub data_sources: HashMap<String, DataSource>,
pub templates: HashMap<String, Template>,
pub networks: HashMap<String, NetworkConfig>,
pub block_handlers: HashMap<String, BlockHandler>,
pub block_handlers: HashMap<String, BlockHandlerConfig>,
}

#[derive(Debug)]
Expand Down
41 changes: 4 additions & 37 deletions ghost-crab-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,8 @@ pub fn block_handler(metadata: TokenStream, input: TokenStream) -> TokenStream {
}

let config = config::load().unwrap();
let source = config.block_handlers.get(name).expect("Source not found.");

let network_config =
config.networks.get(&source.network).expect("RPC url not found for network");
let rpc_url = Literal::string(&network_config.rpc_url);
let requests_per_second = Literal::u64_suffixed(network_config.requests_per_second);

let step = Literal::u64_suffixed(source.step);
let start_block = Literal::u64_suffixed(source.start_block);
let network = Literal::string(&source.network);

let execution_mode = match source.execution_mode {
Some(ExecutionMode::Serial) => quote! { ExecutionMode::Serial },
_ => quote! { ExecutionMode::Parallel },
};
let _ = config.block_handlers.get(name).expect("BlockHandler not found in the config.json");
let name = Literal::string(name);

let parsed = parse_macro_input!(input as ItemFn);
let fn_name = parsed.sig.ident.clone();
Expand All @@ -61,28 +48,8 @@ pub fn block_handler(metadata: TokenStream, input: TokenStream) -> TokenStream {
#fn_body
}

fn step(&self) -> u64 {
#step
}

fn network(&self) -> String {
String::from(#network)
}

fn rpc_url(&self) -> String {
String::from(#rpc_url)
}

fn rate_limit(&self) -> u64 {
#requests_per_second
}

fn start_block(&self) -> u64 {
#start_block
}

fn execution_mode(&self) -> ExecutionMode {
#execution_mode
fn name() -> String {
String::from(#name)
}
}
})
Expand Down
19 changes: 7 additions & 12 deletions ghost-crab/src/block_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use alloy::rpc::types::eth::Block;
use alloy::rpc::types::eth::BlockNumberOrTag;
use alloy::transports::TransportError;
use async_trait::async_trait;
use ghost_crab_common::config::BlockHandlerConfig;
use ghost_crab_common::config::ExecutionMode;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -27,28 +28,22 @@ pub type BlockHandlerInstance = Arc<Box<(dyn BlockHandler + Send + Sync)>>;
#[async_trait]
pub trait BlockHandler {
async fn handle(&self, params: BlockContext);
fn step(&self) -> u64;
fn network(&self) -> String;
fn rpc_url(&self) -> String;
fn start_block(&self) -> u64;
fn rate_limit(&self) -> u64;
fn execution_mode(&self) -> ExecutionMode;
fn name(&self) -> String;
}

pub struct ProcessBlocksInput {
pub handler: BlockHandlerInstance,
pub templates: TemplateManager,
pub provider: CacheProvider,
pub config: BlockHandlerConfig,
}

pub async fn process_blocks(
ProcessBlocksInput { handler, templates, provider }: ProcessBlocksInput,
ProcessBlocksInput { handler, templates, provider, config }: ProcessBlocksInput,
) -> Result<(), TransportError> {
let step = handler.step();
let start_block = handler.start_block();
let execution_mode = handler.execution_mode();
let execution_mode = config.execution_mode.unwrap_or(ExecutionMode::Parallel);

let mut current_block = start_block;
let mut current_block = config.start_block;
let mut latest_block_manager =
LatestBlockManager::new(provider.clone(), Duration::from_secs(10));

Expand Down Expand Up @@ -83,6 +78,6 @@ pub async fn process_blocks(
}
}

current_block += step;
current_block += config.step;
}
}
40 changes: 27 additions & 13 deletions ghost-crab/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::block_handler::{process_blocks, BlockHandlerInstance, ProcessBlocksIn
use crate::cache::manager::RPCManager;
use crate::event_handler::{process_events, EventHandlerInstance, ProcessEventsInput};
use alloy::primitives::Address;
use ghost_crab_common::config::{self, Config, ConfigError};
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::{self, Receiver, Sender};

Expand All @@ -28,19 +29,23 @@ pub struct Indexer {
block_handlers: Vec<ProcessBlocksInput>,
templates: TemplateManager,
rpc_manager: RPCManager,
config: Config,
}

impl Indexer {
pub fn new() -> Indexer {
pub fn new() -> Result<Indexer, ConfigError> {
let (tx, rx) = mpsc::channel::<Template>(1);

Indexer {
let config = config::load()?;

Ok(Indexer {
config,
handlers: Vec::new(),
block_handlers: Vec::new(),
templates: TemplateManager { tx },
rpc_manager: RPCManager::new(),
rx,
}
})
}

pub async fn load_event_handler(&mut self, handler: EventHandlerInstance) {
Expand All @@ -64,16 +69,25 @@ impl Indexer {
}

pub async fn load_block_handler(&mut self, handler: BlockHandlerInstance) {
let provider = self
.rpc_manager
.get_or_create(handler.network(), handler.rpc_url(), handler.rate_limit())
.await;

self.block_handlers.push(ProcessBlocksInput {
handler,
templates: self.templates.clone(),
provider,
});
if let Some(block_config) = self.config.block_handlers.remove(&handler.name()) {
if let Some(network) = self.config.networks.get(&block_config.network) {
let provider = self
.rpc_manager
.get_or_create(
block_config.network.clone(),
network.rpc_url.clone(),
network.requests_per_second,
)
.await;

self.block_handlers.push(ProcessBlocksInput {
handler,
templates: self.templates.clone(),
provider,
config: block_config,
});
}
}
}

pub async fn start(mut self) {
Expand Down

0 comments on commit 6128230

Please sign in to comment.