Skip to content

Commit

Permalink
feat: add max_block_range to networks - #55
Browse files Browse the repository at this point in the history
  • Loading branch information
joshstevens19 committed Jul 21, 2024
1 parent cc16b04 commit 8ff14c1
Show file tree
Hide file tree
Showing 16 changed files with 260 additions and 169 deletions.
1 change: 1 addition & 0 deletions cli/src/commands/new.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub fn handle_new_command(
chain_id: 1,
rpc: "https://mainnet.gateway.tenderly.co".to_string(),
compute_units_per_second: None,
max_block_range: None,
}],
contracts: vec![Contract {
name: "RocketPoolETH".to_string(),
Expand Down
15 changes: 14 additions & 1 deletion core/src/event/callback_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,27 @@ pub struct TxInformation {
pub transaction_index: U64,
}

#[derive(Debug, Clone)]
pub struct LogFoundInRequest {
pub from_block: U64,
pub to_block: U64,
}

#[derive(Debug, Clone)]
pub struct EventResult {
pub log: Log,
pub decoded_data: Arc<dyn Any + Send + Sync>,
pub tx_information: TxInformation,
pub found_in_request: LogFoundInRequest,
}

impl EventResult {
pub fn new(network_contract: Arc<NetworkContract>, log: Log) -> Self {
pub fn new(
network_contract: Arc<NetworkContract>,
log: Log,
start_block: U64,
end_block: U64,
) -> Self {
let log_meta = LogMeta::from(&log);
let log_address = log.address;
Self {
Expand All @@ -57,6 +69,7 @@ impl EventResult {
transaction_index: log_meta.transaction_index,
log_index: log_meta.log_index,
},
found_in_request: LogFoundInRequest { from_block: start_block, to_block: end_block },
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions core/src/generator/networks_bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub fn network_provider_fn_name(network: &Network) -> String {
fn generate_network_lazy_provider_code(network: &Network) -> Code {
Code::new(format!(
r#"
static ref {network_name}: Arc<JsonRpcCachedProvider> = create_client(&public_read_env_value("{network_url}").unwrap_or("{network_url}".to_string()), {compute_units_per_second}).expect("Error creating provider");
static ref {network_name}: Arc<JsonRpcCachedProvider> = create_client(&public_read_env_value("{network_url}").unwrap_or("{network_url}".to_string()), {compute_units_per_second}, {max_block_range}).expect("Error creating provider");
"#,
network_name = network_provider_name(network),
network_url = network.rpc,
Expand All @@ -24,7 +24,12 @@ fn generate_network_lazy_provider_code(network: &Network) -> Code {
format!("Some({})", compute_units_per_second)
} else {
"None".to_string()
}
},
max_block_range = if let Some(max_block_range) = network.max_block_range {
format!("Some(U64::from({}))", max_block_range)
} else {
"None".to_string()
},
))
}

Expand Down Expand Up @@ -80,6 +85,7 @@ pub fn generate_networks_code(networks: &[Network]) -> Code {
/// Any manual changes to this file will be overwritten.
use ethers::providers::{Provider, Http, RetryClient};
use ethers::types::U64;
use rindexer::{
lazy_static,
provider::{create_client, JsonRpcCachedProvider},
Expand Down
26 changes: 20 additions & 6 deletions core/src/indexer/fetch_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ethers::{
use regex::Regex;
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use crate::{
event::{config::EventProcessingConfig, RindexerEventFilter},
Expand All @@ -18,7 +18,6 @@ use crate::{

pub struct FetchLogsResult {
pub logs: Vec<Log>,
#[allow(dead_code)]
pub from_block: U64,
pub to_block: U64,
}
Expand All @@ -35,10 +34,25 @@ pub fn fetch_logs_stream(

tokio::spawn(async move {
let snapshot_to_block = initial_filter.get_to_block();
let from_block = initial_filter.get_from_block();
let mut current_filter = initial_filter;

// Process historical logs first
let mut max_block_range_limitation = None;
// add any max block range limitation before we start processing
let mut max_block_range_limitation =
config.network_contract.cached_provider.max_block_range;
if max_block_range_limitation.is_some() {
current_filter = current_filter.set_to_block(calculate_process_historic_log_to_block(
&from_block,
&snapshot_to_block,
&max_block_range_limitation,
));
warn!(
"{} - {} - max block range limitation of {} blocks applied - block range indexing will be slower then RPC providers supplying the optimal ranges - https://rindexer.xyz/docs/references/rpc-node-providers#rpc-node-providers",
config.info_log_name,
IndexingEventProgressStatus::Syncing.log(),
max_block_range_limitation.unwrap()
);
}
while current_filter.get_from_block() <= snapshot_to_block {
let semaphore_client = Arc::clone(&config.semaphore);
let permit = semaphore_client.acquire_owned().await;
Expand All @@ -60,7 +74,7 @@ pub fn fetch_logs_stream(

// slow indexing warn user
if let Some(range) = max_block_range_limitation {
info!(
warn!(
"{} - RPC PROVIDER IS SLOW - Slow indexing mode enabled, max block range limitation: {} blocks - we advise using a faster provider who can predict the next block ranges.",
&config.info_log_name,
range
Expand Down Expand Up @@ -146,7 +160,7 @@ async fn fetch_historic_logs_stream(

return Some(ProcessHistoricLogsStreamResult {
next: current_filter.set_from_block(to_block),
max_block_range_limitation: None,
max_block_range_limitation,
});
}

Expand Down
13 changes: 2 additions & 11 deletions core/src/indexer/no_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,24 +145,15 @@ fn no_code_callback(params: Arc<NoCodeCallbackParams>) -> EventCallbackType {
return Ok(());
}

let from_block = match results.first() {
Some(first) => first.tx_information.block_number,
let (from_block, to_block) = match results.first() {
Some(first) => (first.found_in_request.from_block, first.found_in_request.to_block),
None => {
let error_message = "Unexpected error: no first event despite non-zero length.";
error!("{}", error_message);
return Err(error_message.to_string());
}
};

let to_block = match results.last() {
Some(last) => last.tx_information.block_number,
None => {
let error_message = "Unexpected error: no last event despite non-zero length.";
error!("{}", error_message);
return Err(error_message.to_string());
}
};

let mut indexed_count = 0;
let mut postgres_bulk_data: Vec<Vec<EthereumSqlTypeWrapper>> = Vec::new();
let mut postgres_bulk_column_types: Vec<PgType> = Vec::new();
Expand Down
9 changes: 8 additions & 1 deletion core/src/indexer/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,14 @@ async fn handle_logs_result(
let fn_data = result
.logs
.into_iter()
.map(|log| EventResult::new(Arc::clone(&config.network_contract), log))
.map(|log| {
EventResult::new(
Arc::clone(&config.network_contract),
log,
result.from_block,
result.to_block,
)
})
.collect::<Vec<_>>();

if !fn_data.is_empty() {
Expand Down
24 changes: 2 additions & 22 deletions core/src/manifest/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,16 @@ use ethers::{
addressbook::Address,
prelude::{Filter, ValueOrArray, U64},
};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde::{Deserialize, Serialize};

use super::core::{deserialize_option_u64_from_string, serialize_option_u64_as_string};
use crate::{
event::contract_setup::{
AddressDetails, ContractEventMapping, FilterDetails, IndexingContractSetup,
},
indexer::parse_topic,
};

fn deserialize_option_u64_from_string<'de, D>(deserializer: D) -> Result<Option<U64>, D::Error>
where
D: Deserializer<'de>,
{
let s: Option<String> = Option::deserialize(deserializer)?;
match s {
Some(string) => U64::from_dec_str(&string).map(Some).map_err(serde::de::Error::custom),
None => Ok(None),
}
}

fn serialize_option_u64_as_string<S>(value: &Option<U64>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *value {
Some(ref u64_value) => serializer.serialize_some(&u64_value.as_u64().to_string()),
None => serializer.serialize_none(),
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct EventInputIndexedFilters {
pub event_name: String,
Expand Down
25 changes: 25 additions & 0 deletions core/src/manifest/core.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ethers::prelude::U64;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_yaml::Value;

Expand Down Expand Up @@ -94,3 +95,27 @@ impl Manifest {
self.storage.csv_enabled() && contract_csv_enabled
}
}

pub fn deserialize_option_u64_from_string<'de, D>(deserializer: D) -> Result<Option<U64>, D::Error>
where
D: Deserializer<'de>,
{
let s: Option<String> = Option::deserialize(deserializer)?;
match s {
Some(string) => U64::from_dec_str(&string).map(Some).map_err(serde::de::Error::custom),
None => Ok(None),
}
}

pub fn serialize_option_u64_as_string<S>(
value: &Option<U64>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *value {
Some(ref u64_value) => serializer.serialize_some(&u64_value.as_u64().to_string()),
None => serializer.serialize_none(),
}
}
11 changes: 11 additions & 0 deletions core/src/manifest/network.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use ethers::prelude::U64;
use serde::{Deserialize, Serialize};

use super::core::{deserialize_option_u64_from_string, serialize_option_u64_as_string};

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct Network {
pub name: String,
Expand All @@ -10,4 +13,12 @@ pub struct Network {

#[serde(default, skip_serializing_if = "Option::is_none")]
pub compute_units_per_second: Option<u64>,

#[serde(
default,
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_option_u64_from_string",
serialize_with = "serialize_option_u64_as_string"
)]
pub max_block_range: Option<U64>,
}
22 changes: 16 additions & 6 deletions core/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ use crate::{event::RindexerEventFilter, manifest::core::Manifest};
pub struct JsonRpcCachedProvider {
provider: Arc<Provider<RetryClient<Http>>>,
cache: Mutex<Option<(Instant, Arc<Block<H256>>)>>,
pub max_block_range: Option<U64>,
}

impl JsonRpcCachedProvider {
pub fn new(provider: Provider<RetryClient<Http>>) -> Self {
JsonRpcCachedProvider { provider: Arc::new(provider), cache: Mutex::new(None) }
pub fn new(provider: Provider<RetryClient<Http>>, max_block_range: Option<U64>) -> Self {
JsonRpcCachedProvider {
provider: Arc::new(provider),
cache: Mutex::new(None),
max_block_range,
}
}

pub async fn get_latest_block(&self) -> Result<Option<Arc<Block<H256>>>, ProviderError> {
Expand Down Expand Up @@ -73,6 +78,7 @@ pub enum RetryClientError {
pub fn create_client(
rpc_url: &str,
compute_units_per_second: Option<u64>,
max_block_range: Option<U64>,
) -> Result<Arc<JsonRpcCachedProvider>, RetryClientError> {
let url = Url::parse(rpc_url).map_err(|e| {
RetryClientError::HttpProviderCantBeCreated(rpc_url.to_string(), e.to_string())
Expand All @@ -87,7 +93,7 @@ pub fn create_client(
.initial_backoff(Duration::from_millis(500))
.build(provider, Box::<ethers::providers::HttpRateLimitRetryPolicy>::default()),
);
Ok(Arc::new(JsonRpcCachedProvider::new(instance)))
Ok(Arc::new(JsonRpcCachedProvider::new(instance, max_block_range)))
}

pub async fn get_chain_id(rpc_url: &str) -> Result<U256, ProviderError> {
Expand All @@ -107,7 +113,11 @@ impl CreateNetworkProvider {
pub fn create(manifest: &Manifest) -> Result<Vec<CreateNetworkProvider>, RetryClientError> {
let mut result: Vec<CreateNetworkProvider> = vec![];
for network in &manifest.networks {
let provider = create_client(&network.rpc, network.compute_units_per_second)?;
let provider = create_client(
&network.rpc,
network.compute_units_per_second,
network.max_block_range,
)?;
result.push(CreateNetworkProvider {
network_name: network.name.clone(),
client: provider,
Expand All @@ -125,14 +135,14 @@ mod tests {
#[test]
fn test_create_retry_client() {
let rpc_url = "http://localhost:8545";
let result = create_client(rpc_url, Some(660));
let result = create_client(rpc_url, Some(660), None);
assert!(result.is_ok());
}

#[test]
fn test_create_retry_client_invalid_url() {
let rpc_url = "invalid_url";
let result = create_client(rpc_url, Some(660));
let result = create_client(rpc_url, Some(660), None);
assert!(result.is_err());
if let Err(RetryClientError::HttpProviderCantBeCreated(url, _)) = result {
assert_eq!(url, rpc_url);
Expand Down
2 changes: 2 additions & 0 deletions documentation/docs/pages/docs/changelog.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
### Features
-------------------------------------------------

- feat: add max_block_range to networks - https://github.com/joshstevens19/rindexer/issues/55

### Bug fixes
-------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,27 @@ networks:
You can read more about environment variables in the [Environment Variables](/docs/start-building/yaml-config#environment-variables) section.
### max_block_range
:::info
This field is optional and will slow down indexing if applied, rindexer is fastest when you use a RPC provider who can predict the next block ranges when fetching logs.
You can read a bit more about RPC providers [here](/docs/references/rpc-node-providers#rpc-node-providers).
:::
Set the max block range for the network, this means when rindexer is fetching logs it will not fetch more than the max block range per request.
```yaml [rindexer.yaml]
name: rETHIndexer
description: My first rindexer project
repository: https://github.com/joshstevens19/rindexer
project_type: no-code
networks:
- name: ethereum
chain_id: 1
rpc: https://mainnet.gateway.tenderly.co
max_block_range: 10000 // [!code focus]
```
### compute_units_per_second
:::info
Expand Down
1 change: 1 addition & 0 deletions rindexer_rust_playground/rindexer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ networks:
- name: yominet
chain_id: 5264468217
rpc: https://yominet.rpc.caldera.xyz/http
max_block_range: 10000
storage:
postgres:
enabled: true
Expand Down
Loading

0 comments on commit 8ff14c1

Please sign in to comment.