Skip to content

Commit

Permalink
Simplify poller (helius-labs#234)
Browse files Browse the repository at this point in the history
* Simplify poller

* Simplify poller
  • Loading branch information
pmantica11 authored Oct 18, 2024
1 parent 069ac2e commit c8a8c04
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 173 deletions.
10 changes: 5 additions & 5 deletions src/ingester/fetchers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use yellowstone_grpc_proto::solana::storage::confirmed_block::InnerInstructions;
use crate::api::method::get_indexer_health::HEALTH_CHECK_SLOT_DISTANCE;
use crate::common::typedefs::hash::Hash;
use crate::common::typedefs::rpc_client_with_uri::RpcClientWithUri;
use crate::ingester::fetchers::poller::get_poller_block_stream;
use crate::ingester::fetchers::poller::get_block_poller_stream;
use crate::ingester::typedefs::block_info::{
BlockInfo, BlockMetadata, Instruction, InstructionGroup, TransactionInfo,
};
Expand All @@ -48,7 +48,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
let grpc_stream = get_grpc_block_stream(endpoint, auth_header);
pin_mut!(grpc_stream);
let mut rpc_poll_stream: Option<Pin<Box<dyn Stream<Item = Vec<BlockInfo>> + Send>>> = Some(
Box::pin(get_poller_block_stream(
Box::pin(get_block_poller_stream(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
Expand Down Expand Up @@ -103,7 +103,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
statsd_count!("grpc_timeout", 1);
}
info!("gRPC stream timed out, enabling RPC block fetching");
rpc_poll_stream = Some(Box::pin(get_poller_block_stream(
rpc_poll_stream = Some(Box::pin(get_block_poller_stream(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
Expand All @@ -120,7 +120,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
statsd_count!("grpc_out_of_order", 1);
}
info!("Switching to RPC block fetching");
rpc_poll_stream = Some(Box::pin(get_poller_block_stream(
rpc_poll_stream = Some(Box::pin(get_block_poller_stream(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
Expand All @@ -132,7 +132,7 @@ pub fn get_grpc_stream_with_rpc_fallback(
metric! {
statsd_count!("grpc_stale", 1);
}
rpc_poll_stream = Some(Box::pin(get_poller_block_stream(
rpc_poll_stream = Some(Box::pin(get_block_poller_stream(
rpc_client.clone(),
last_indexed_slot,
max_concurrent_block_fetches,
Expand Down
4 changes: 2 additions & 2 deletions src/ingester/fetchers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub mod grpc;
pub mod poller;

use grpc::get_grpc_stream_with_rpc_fallback;
use poller::get_poller_block_stream;
use poller::get_block_poller_stream;

pub struct BlockStreamConfig {
pub rpc_client: Arc<RpcClientWithUri>,
Expand All @@ -34,7 +34,7 @@ impl BlockStreamConfig {
});

let poller_stream = if self.geyser_url.is_none() {
Some(get_poller_block_stream(
Some(get_block_poller_stream(
self.rpc_client.clone(),
self.last_indexed_slot,
self.max_concurrent_block_fetches,
Expand Down
213 changes: 51 additions & 162 deletions src/ingester/fetchers/poller.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use std::{
cmp::max,
collections::{BTreeMap, HashSet},
num::NonZeroUsize,
collections::BTreeMap,
sync::{atomic::Ordering, Arc},
time::Duration,
};

use async_stream::stream;
use cadence_macros::statsd_count;
use futures::{stream::FuturesUnordered, StreamExt};
use lru::LruCache;
use futures::{pin_mut, Stream, StreamExt};
use solana_client::{
client_error, nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig,
rpc_request::RpcError,
nonblocking::rpc_client::RpcClient, rpc_config::RpcBlockConfig, rpc_request::RpcError,
};

use solana_sdk::commitment_config::CommitmentConfig;
Expand All @@ -26,148 +22,58 @@ use crate::{
};

const SKIPPED_BLOCK_ERRORS: [i64; 2] = [-32007, -32009];
const RETRIES: u64 = 3;
const INFINITY: u64 = u64::MAX;

/// This function creates a stream that continuously fetches and emits blocks from a Solana blockchain.
/// It implements a concurrent block fetching algorithm with the following key features:
///
/// 1. Concurrent block fetching: It can fetch multiple blocks simultaneously up to a specified limit.
/// 2. Block caching: Fetched blocks are cached if they can't be immediately processed.
/// 3. Skipped slot handling: It keeps track of skipped slots to avoid unnecessary fetching attempts.
/// 4. Ordered block emission: It ensures blocks are emitted in the correct order, even if fetched out of order.
///
/// Algorithm overview:
/// - Initialize data structures for block fetching, caching, and tracking.
/// - Enter a loop that continues indefinitely:
/// a. Fetch the current latest slot.
/// b. Initiate new block fetches up to the concurrent limit.
/// c. Process completed block fetches:
/// - If the block is the next in sequence, emit it along with any cached blocks that follow.
/// - If not, cache the block and fetch its parent if necessary.
/// d. Refill the block fetching queue.
/// e. Brief sleep to allow other threads to update the latest slot.
///
/// This approach allows for efficient block fetching while maintaining the correct order of block processing.
pub fn get_poller_block_stream(
client: Arc<RpcClientWithUri>,
fn get_slot_stream(rpc_client: Arc<RpcClientWithUri>, start_slot: u64) -> impl Stream<Item = u64> {
stream! {
start_latest_slot_updater(rpc_client.clone()).await;
let mut next_slot_to_fetch = start_slot;
loop {
if next_slot_to_fetch > LATEST_SLOT.load(Ordering::SeqCst) {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
continue;
}
yield next_slot_to_fetch;
next_slot_to_fetch += 1;
}
}
}

pub fn get_block_poller_stream(
rpc_client: Arc<RpcClientWithUri>,
mut last_indexed_slot: u64,
max_concurrent_block_fetches: usize,
) -> impl futures::Stream<Item = Vec<BlockInfo>> {
) -> impl Stream<Item = Vec<BlockInfo>> {
stream! {
start_latest_slot_updater(client.clone()).await;
let mut block_fetching_futures = FuturesUnordered::new();
let mut block_cache: BTreeMap<u64, BlockInfo> = BTreeMap::new();
let mut in_process_slots = HashSet::new();
let mut next_slot_to_fetch = match last_indexed_slot {
let start_slot = match last_indexed_slot {
0 => 0,
last_indexed_slot => last_indexed_slot + 1
};
let mut skipped_slot_cache = LruCache::new(NonZeroUsize::new(1000).unwrap());


loop {
let current_slot = LATEST_SLOT.load(Ordering::SeqCst);

// Refill the block fetching futures with new slots to fetch
//
// If we just continued from the last indexed slot + 1, we might get into an infinite retry loop when
// we encounter a large number of skipped slots. To avoid that, when we refill the block fetching
// futures, we continue from the max of the block fetched in the previous outer loop iteration and
// the last indexed slot + 1.
next_slot_to_fetch = max(next_slot_to_fetch, last_indexed_slot + 1);
for _ in 0..max_concurrent_block_fetches {
if next_slot_to_fetch > current_slot {
break;
}
if !skipped_slot_cache.contains(&next_slot_to_fetch) && is_slot_unprocessed(next_slot_to_fetch, &in_process_slots, &block_cache, last_indexed_slot) {
block_fetching_futures.push(fetch_block(
client.uri.clone(),
next_slot_to_fetch,
RETRIES
));
in_process_slots.insert(next_slot_to_fetch);
}
next_slot_to_fetch += 1;
let slot_stream = get_slot_stream(rpc_client.clone(), start_slot);
pin_mut!(slot_stream);
let block_stream = slot_stream
.map(|slot| {
let rpc_client = rpc_client.clone();
async move { fetch_block_with_infinite_retries(rpc_client.uri.clone(), slot).await }
})
.buffer_unordered(max_concurrent_block_fetches);
pin_mut!(block_stream);
let mut block_cache: BTreeMap<u64, BlockInfo> = BTreeMap::new();
while let Some(block) = block_stream.next().await {
if let Some(block) = block {
block_cache.insert(block.metadata.slot, block);
}

while let Some(block) = block_fetching_futures.next().await {
let (block_result, slot) = block;
in_process_slots.remove(&slot);
if let Ok(block) = block_result {
if let None = block {
skipped_slot_cache.push(slot, true);
continue;
}
let block = block.unwrap();

let current_slot = LATEST_SLOT.load(Ordering::SeqCst);

// If the block is the next block to index, emit it and the consecutive blocks in the block cache
if block.metadata.slot == 0 || block.metadata.parent_slot == last_indexed_slot {
last_indexed_slot = block.metadata.slot;
let mut blocks_to_index = vec![block];
let (cached_blocks_to_index, last_indexed_slot_from_cache) = pop_cached_blocks_to_index(&mut block_cache, last_indexed_slot);
last_indexed_slot = last_indexed_slot_from_cache;
blocks_to_index.extend(cached_blocks_to_index);
let blocks_to_index_len = blocks_to_index.len();
metric! {
statsd_count!("rpc_block_emitted", blocks_to_index_len as i64);
}
yield blocks_to_index;
}
else {
let parent_slot = block.metadata.parent_slot;

// If the parent block is not processed, fetch it
if is_slot_unprocessed(parent_slot, &in_process_slots, &block_cache, last_indexed_slot) {
block_fetching_futures.push(fetch_block(
client.uri.clone(),
parent_slot,
INFINITY
));
in_process_slots.insert(parent_slot);
}
block_cache.insert(block.metadata.slot, block.clone());
}

// Refill the block fetching futures with new slots to fetch
for next_slot in (last_indexed_slot + 1)..(slot + 1 + max_concurrent_block_fetches as u64) {
if in_process_slots.len() >= max_concurrent_block_fetches {
break;
}
if next_slot > current_slot {
break;
}
if !skipped_slot_cache.contains(&next_slot) && is_slot_unprocessed(next_slot, &in_process_slots, &block_cache, last_indexed_slot) {
block_fetching_futures.push(fetch_block(
client.uri.clone(),
next_slot,
RETRIES
));
in_process_slots.insert(next_slot);
}
}
}
let (blocks_to_index, last_indexed_slot_from_cache) = pop_cached_blocks_to_index(&mut block_cache, last_indexed_slot);
last_indexed_slot = last_indexed_slot_from_cache;
metric! {
statsd_count!("rpc_block_emitted", blocks_to_index.len() as i64);
}
if !blocks_to_index.is_empty() {
yield blocks_to_index;
}
// Sleep to give the chance to other thread to update LATEST_SLOT
tokio::time::sleep(std::time::Duration::from_millis(10)).await;

}
}
}

fn is_slot_unprocessed(
slot: u64,
in_process_slots: &HashSet<u64>,
block_cache: &BTreeMap<u64, BlockInfo>,
last_indexed_slot: u64,
) -> bool {
!in_process_slots.contains(&slot)
&& !block_cache.contains_key(&slot)
&& slot > last_indexed_slot
}

fn pop_cached_blocks_to_index(
block_cache: &mut BTreeMap<u64, BlockInfo>,
mut last_indexed_slot: u64,
Expand Down Expand Up @@ -196,14 +102,11 @@ fn pop_cached_blocks_to_index(
(blocks, last_indexed_slot)
}

pub async fn fetch_block(
rpc_uri: String,
slot: u64,
retries: u64,
) -> (Result<Option<BlockInfo>, client_error::ClientError>, u64) {
pub async fn fetch_block_with_infinite_retries(rpc_uri: String, slot: u64) -> Option<BlockInfo> {
let mut attempt_counter = 0;
loop {
let timeout_sec = if attempt_counter <= 1 { 5 } else { 30 };
attempt_counter += 1;
let client = RpcClient::new_with_timeout_and_commitment(
rpc_uri.clone(),
Duration::from_secs(timeout_sec),
Expand All @@ -226,38 +129,24 @@ pub async fn fetch_block(
metric! {
statsd_count!("rpc_block_fetched", 1);
}
return (
Ok(Some(parse_ui_confirmed_blocked(block, slot).unwrap())),
slot,
);
return Some(parse_ui_confirmed_blocked(block, slot).unwrap());
}
Err(e) => {
if let solana_client::client_error::ClientErrorKind::RpcError(
RpcError::RpcResponseError { code, .. },
) = e.kind
{
if SKIPPED_BLOCK_ERRORS.contains(&code) {
if retries == INFINITY {
log::error!(
"Got skipped block error for block that is supposed to exist: {}",
slot
);
} else {
metric! {
statsd_count!("rpc_skipped_block", 1);
}
log::info!("Skipped block: {}", slot);
metric! {
statsd_count!("rpc_skipped_block", 1);
}
return (Ok(None), slot);
log::info!("Skipped block: {}", slot);
return None;
}
}
log::debug!("Failed to fetch block: {}. {}", slot, e.to_string());
attempt_counter += 1;
if attempt_counter >= retries {
metric! {
statsd_count!("rpc_block_fetch_failed", 1);
}
return (Err(e), slot);
metric! {
statsd_count!("rpc_block_fetch_failed", 1);
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use cadence_macros::{statsd_count, statsd_gauge};
use log::error;
use log::{error, info};
use once_cell::sync::Lazy;
use sea_orm::{ColumnTrait, DatabaseConnection, EntityTrait, QueryFilter};
use solana_client::nonblocking::rpc_client::RpcClient;
Expand All @@ -28,13 +28,10 @@ use light_concurrent_merkle_tree::copy::ConcurrentMerkleTreeCopy;
use light_concurrent_merkle_tree::light_hasher::Poseidon;
use light_sdk::state::MerkleTreeMetadata;


use crate::common::typedefs::hash::Hash;


use solana_sdk::account::Account as SolanaAccount;


use solana_sdk::pubkey::Pubkey;
use std::mem;
const CHUNK_SIZE: usize = 100;
Expand Down Expand Up @@ -73,6 +70,7 @@ pub fn continously_monitor_photon(
if lag < HEALTH_CHECK_SLOT_DISTANCE as u64 {
has_been_healthy = true;
}
info!("Indexing lag: {}", lag);
if has_been_healthy && lag > HEALTH_CHECK_SLOT_DISTANCE as u64 {
error!("Indexing lag is too high: {}", lag);
continue;
Expand Down

0 comments on commit c8a8c04

Please sign in to comment.