Skip to content

Commit

Permalink
feat: Load "event_handler" config dynamically at runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
luis-herasme committed Jul 26, 2024
1 parent 8b703ab commit eb2c7ef
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 96 deletions.
77 changes: 5 additions & 72 deletions ghost-crab-macros/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
extern crate proc_macro;
use ghost_crab_common::config::{self, ExecutionMode};
use ghost_crab_common::config;
use proc_macro::TokenStream;
use proc_macro2::{Ident, Literal};
use quote::{format_ident, quote};
Expand Down Expand Up @@ -103,51 +103,12 @@ fn create_handler(metadata: TokenStream, input: TokenStream, is_template: bool)
let (name, event_name) = get_source_and_event(metadata);
let config = config::load().unwrap();

let abi;
let network;
let execution_mode;
let address;
let start_block;

if is_template {
let abi = if is_template {
let source = config.templates.get(&name).expect("Source not found.");

abi = source.abi.clone();
network = source.network.clone();
execution_mode = source.execution_mode.clone().unwrap_or(ExecutionMode::Parallel);
address = quote! {
Address::ZERO
};
start_block = Literal::u64_suffixed(0);
Literal::string(&source.abi)
} else {
let source = config.data_sources.get(&name).expect("Source not found.");

abi = source.abi.clone();
network = source.network.clone();
execution_mode = source.execution_mode.clone().unwrap_or(ExecutionMode::Parallel);

let address_literal = Literal::string(&source.address[2..]);

address = quote! {
address!(#address_literal)
};
start_block = Literal::u64_suffixed(source.start_block);
};

let network_config = config.networks.get(&network).expect("RPC url not found for network");
let rpc_url = Literal::string(&network_config.rpc_url);
let requests_per_second = Literal::u64_suffixed(network_config.requests_per_second);

let abi = Literal::string(&abi);
let network = Literal::string(&network);

let execution_mode = match execution_mode {
ExecutionMode::Parallel => quote! {
ExecutionMode::Parallel
},
ExecutionMode::Serial => quote! {
ExecutionMode::Serial
},
Literal::string(&source.abi)
};

let parsed = parse_macro_input!(input as ItemFn);
Expand Down Expand Up @@ -187,38 +148,10 @@ fn create_handler(metadata: TokenStream, input: TokenStream, is_template: bool)
#fn_body
}

fn start_block(&self) -> u64 {
#start_block
}

fn get_source(&self) -> String {
fn name(&self) -> String {
String::from(#data_source)
}

fn is_template(&self) -> bool {
#is_template
}

fn address(&self) -> Address {
#address
}

fn network(&self) -> String {
String::from(#network)
}

fn rpc_url(&self) -> String {
String::from(#rpc_url)
}

fn rate_limit(&self) -> u64 {
#requests_per_second
}

fn execution_mode(&self) -> ExecutionMode {
#execution_mode
}

fn event_signature(&self) -> String {
#contract_name::#event_name::SIGNATURE.to_string()
}
Expand Down
15 changes: 4 additions & 11 deletions ghost-crab/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,7 @@ pub type EventHandlerInstance = Arc<Box<(dyn EventHandler + Send + Sync)>>;
#[async_trait]
pub trait EventHandler {
async fn handle(&self, params: EventContext);
fn get_source(&self) -> String;
fn is_template(&self) -> bool;
fn start_block(&self) -> u64;
fn address(&self) -> Address;
fn network(&self) -> String;
fn rpc_url(&self) -> String;
fn rate_limit(&self) -> u64;
fn execution_mode(&self) -> ExecutionMode;
fn name(&self) -> String;
fn event_signature(&self) -> String;
}

Expand All @@ -57,12 +50,12 @@ pub struct ProcessEventsInput {
pub handler: EventHandlerInstance,
pub templates: TemplateManager,
pub provider: CacheProvider,
pub execution_mode: ExecutionMode,
}

pub async fn process_events(
ProcessEventsInput { start_block, step, address, handler, templates, provider }: ProcessEventsInput,
ProcessEventsInput { start_block, execution_mode, step, address, handler, templates, provider }: ProcessEventsInput,
) -> Result<(), TransportError> {
let execution_mode = handler.execution_mode();
let event_signature = handler.event_signature();

let mut current_block = start_block;
Expand All @@ -82,7 +75,7 @@ pub async fn process_events(
continue;
}

let source = handler.get_source();
let source = handler.name();

println!("[{}] Processing logs from {} to {}", source, current_block, end_block);

Expand Down
64 changes: 51 additions & 13 deletions ghost-crab/src/indexer/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,42 @@ impl Indexer {
})
}

pub async fn load_event_handler(&mut self, handler: EventHandlerInstance) {
if handler.is_template() {
return;
}
pub async fn load_event_handler(
&mut self,
handler: EventHandlerInstance,
) -> Result<(), AddHandlerError> {
let event_config = self
.config
.data_sources
.remove(&handler.name())
.ok_or(AddHandlerError::NotFound(handler.name()))?;

let network = self
.config
.networks
.get(&event_config.network)
.ok_or(AddHandlerError::NetworkNotFound(event_config.network.clone()))?;

let provider = self
.rpc_manager
.get_or_create(handler.network(), handler.rpc_url(), handler.rate_limit())
.get_or_create(
event_config.network,
network.rpc_url.clone(),
network.requests_per_second,
)
.await;

self.handlers.push(ProcessEventsInput {
start_block: handler.start_block(),
address: handler.address(),
start_block: event_config.start_block,
address: event_config.address.parse().unwrap(),
step: 10_000,
handler,
templates: self.templates.clone(),
provider,
execution_mode: event_config.execution_mode.unwrap_or(config::ExecutionMode::Parallel),
});

Ok(())
}

pub async fn load_block_handler(
Expand Down Expand Up @@ -88,7 +106,7 @@ impl Indexer {
Ok(())
}

pub async fn start(mut self) {
pub async fn start(mut self) -> Result<(), AddHandlerError> {
for block_handler in self.block_handlers {
tokio::spawn(async move {
if let Err(error) = process_blocks(block_handler).await {
Expand All @@ -107,11 +125,26 @@ impl Indexer {

// For dynamic sources (Templates)
while let Some(template) = self.rx.recv().await {
let network = template.handler.network();
let rpc_url = template.handler.rpc_url();
let rate_limit = template.handler.rate_limit();

let provider = self.rpc_manager.get_or_create(network, rpc_url, rate_limit).await;
let template_config = self
.config
.templates
.get(&template.handler.name())
.ok_or(AddHandlerError::NotFound(template.handler.name()))?;

let network = self
.config
.networks
.get(&template_config.network)
.ok_or(AddHandlerError::NetworkNotFound(template_config.network.clone()))?;

let provider = self
.rpc_manager
.get_or_create(
template_config.network.clone(),
network.rpc_url.clone(),
network.requests_per_second,
)
.await;

let handler = ProcessEventsInput {
start_block: template.start_block,
Expand All @@ -120,6 +153,9 @@ impl Indexer {
handler: template.handler,
templates: self.templates.clone(),
provider,
execution_mode: template_config
.execution_mode
.unwrap_or(config::ExecutionMode::Parallel),
};

tokio::spawn(async move {
Expand All @@ -128,5 +164,7 @@ impl Indexer {
}
});
}

Ok(())
}
}

0 comments on commit eb2c7ef

Please sign in to comment.