Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change threading model for higher parallelism and throughput #249

Merged
merged 28 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.62"
async-trait = "0.1.53"
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "af0dcea7144225a709e4f595e58f8026b99e901c" }
kanal = { version = "0.1.0-pre8", features = ["async"] }
backtrace = "0.3.58"
base64 = "0.13.0"
bb8 = "0.8.1"
Expand Down
2 changes: 2 additions & 0 deletions rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ gcloud-sdk = { workspace = true }
google-cloud-googleapis = { workspace = true }
google-cloud-pubsub = { workspace = true }
hex = { workspace = true }
kanal = { workspace = true }
once_cell = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
Expand All @@ -55,4 +56,5 @@ url = { workspace = true }

# Postgres SSL
native-tls = "0.2.11"
num_cpus = "1.16.0"
postgres-native-tls = "0.5.0"
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
-- Your SQL goes here
CREATE INDEX cdb_insat_index ON current_delegator_balances (inserted_at);
CREATE INDEX IF NOT EXISTS cdb_insat_index ON current_delegator_balances (inserted_at);
92 changes: 88 additions & 4 deletions rust/processor/src/grpc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use aptos_protos::indexer::v1::{
raw_data_client::RawDataClient, GetTransactionsRequest, TransactionsResponse,
};
use futures_util::StreamExt;
use kanal::AsyncSender;
use prost::Message;
use std::{sync::Arc, time::Duration};
use tonic::{Response, Streaming};
Expand All @@ -25,6 +26,10 @@ const GRPC_API_GATEWAY_API_KEY_HEADER: &str = "authorization";
const GRPC_REQUEST_NAME_HEADER: &str = "x-aptos-request-name";
/// GRPC connection id
const GRPC_CONNECTION_ID: &str = "x-aptos-connection-id";
/// We will try to reconnect to GRPC 5 times in case upstream connection is being updated
pub const RECONNECTION_MAX_RETRIES: u64 = 65;
/// 256MB
pub const MAX_RESPONSE_SIZE: usize = 1024 * 1024 * 256;

pub fn grpc_request_builder(
starting_version: u64,
Expand Down Expand Up @@ -98,8 +103,8 @@ pub async fn get_stream(
Ok(client) => client
.accept_compressed(tonic::codec::CompressionEncoding::Gzip)
.send_compressed(tonic::codec::CompressionEncoding::Gzip)
.max_decoding_message_size(crate::worker::MAX_RESPONSE_SIZE)
.max_encoding_message_size(crate::worker::MAX_RESPONSE_SIZE),
.max_decoding_message_size(MAX_RESPONSE_SIZE)
.max_encoding_message_size(MAX_RESPONSE_SIZE),
Err(e) => {
error!(
processor_name = processor_name,
Expand Down Expand Up @@ -130,14 +135,76 @@ pub async fn get_stream(
.expect("[Parser] Failed to get grpc response. Is the server running?")
}

pub async fn get_chain_id(
indexer_grpc_data_service_address: Url,
indexer_grpc_http2_ping_interval: Duration,
indexer_grpc_http2_ping_timeout: Duration,
auth_token: String,
processor_name: String,
) -> u64 {
info!(
processor_name = processor_name,
service_type = crate::worker::PROCESSOR_SERVICE_TYPE,
stream_address = indexer_grpc_data_service_address.to_string(),
"[Parser] Connecting to GRPC stream to get chain id",
);
let response = get_stream(
indexer_grpc_data_service_address.clone(),
indexer_grpc_http2_ping_interval,
indexer_grpc_http2_ping_timeout,
1,
Some(2),
auth_token.clone(),
processor_name.to_string(),
)
.await;
let connection_id = match response.metadata().get(GRPC_CONNECTION_ID) {
Some(connection_id) => connection_id.to_str().unwrap().to_string(),
None => "".to_string(),
};
let mut resp_stream = response.into_inner();
info!(
processor_name = processor_name,
service_type = crate::worker::PROCESSOR_SERVICE_TYPE,
stream_address = indexer_grpc_data_service_address.to_string(),
connection_id,
"[Parser] Successfully connected to GRPC stream to get chain id",
);

match resp_stream.next().await {
Some(Ok(r)) => r.chain_id.expect("[Parser] Chain Id doesn't exist."),
Some(Err(rpc_error)) => {
error!(
processor_name = processor_name,
service_type = crate::worker::PROCESSOR_SERVICE_TYPE,
stream_address = indexer_grpc_data_service_address.to_string(),
connection_id,
error = ?rpc_error,
"[Parser] Error receiving datastream response for chain id"
);
panic!("[Parser] Error receiving datastream response for chain id");
},
None => {
error!(
processor_name = processor_name,
service_type = crate::worker::PROCESSOR_SERVICE_TYPE,
stream_address = indexer_grpc_data_service_address.to_string(),
connection_id,
"[Parser] Stream ended before getting response fo for chain id"
);
panic!("[Parser] Stream ended before getting response fo for chain id");
},
}
}

/// Gets a batch of transactions from the stream. Batch size is set in the grpc server.
/// The number of batches depends on our config
/// There could be several special scenarios:
/// 1. If we lose the connection, we will try reconnecting X times within Y seconds before crashing.
/// 2. If we specified an end version and we hit that, we will stop fetching, but we will make sure that
/// all existing transactions are processed
pub async fn create_fetcher_loop(
txn_sender: tokio::sync::mpsc::Sender<TransactionsPBResponse>,
txn_sender: AsyncSender<TransactionsPBResponse>,
indexer_grpc_data_service_address: Url,
indexer_grpc_http2_ping_interval: Duration,
indexer_grpc_http2_ping_timeout: Duration,
Expand Down Expand Up @@ -184,6 +251,8 @@ pub async fn create_fetcher_loop(
"[Parser] Successfully connected to GRPC stream",
);

let mut last_fetched_version = batch_start_version as i64 - 1;
let mut batch_start_version = batch_start_version;
loop {
let is_success = match resp_stream.next().await {
Some(Ok(r)) => {
Expand Down Expand Up @@ -224,6 +293,21 @@ pub async fn create_fetcher_loop(
"{}",
ProcessorStep::ReceivedTxnsFromGrpc.get_label(),
);

let current_fetched_version = start_version;
// TODO: FIX THIS NONSENSICAL ERROR
if last_fetched_version + 1 != current_fetched_version as i64 {
error!(
batch_start_version = batch_start_version,
last_fetched_version = last_fetched_version,
current_fetched_version = current_fetched_version,
"[Parser] Received batch with gap from GRPC stream"
);
// panic!("[Parser] Received batch with gap from GRPC stream");
}
last_fetched_version = end_version as i64;
batch_start_version = (last_fetched_version + 1) as u64;

LATEST_PROCESSED_VERSION
.with_label_values(&[
&processor_name,
Expand Down Expand Up @@ -382,7 +466,7 @@ pub async fn create_fetcher_loop(
// TODO: Turn this into exponential backoff
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

if reconnection_retries >= crate::worker::RECONNECTION_MAX_RETRIES {
if reconnection_retries >= RECONNECTION_MAX_RETRIES {
error!(
processor_name = processor_name,
service_type = crate::worker::PROCESSOR_SERVICE_TYPE,
Expand Down
25 changes: 20 additions & 5 deletions rust/processor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,24 @@ use clap::Parser;
use processor::IndexerGrpcProcessorConfig;
use server_framework::ServerArgs;

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<()> {
let args = ServerArgs::parse();
args.run::<IndexerGrpcProcessorConfig>(tokio::runtime::Handle::current())
.await
const RUNTIME_WORKER_MULTIPLIER: usize = 2;

fn main() -> Result<()> {
let num_cpus = num_cpus::get();
let worker_threads = (num_cpus * RUNTIME_WORKER_MULTIPLIER).max(16);
println!(
"[Processor] Starting processor tokio runtime: num_cpus={}, worker_threads={}",
num_cpus, worker_threads
);

tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(worker_threads)
.build()
.unwrap()
.block_on(async {
let args = ServerArgs::parse();
args.run::<IndexerGrpcProcessorConfig>(tokio::runtime::Handle::current())
.await
})
}
19 changes: 7 additions & 12 deletions rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,12 @@ use self::{
token_v2_processor::TokenV2Processor,
user_transaction_processor::UserTransactionProcessor,
};
use crate::{
models::processor_status::ProcessorStatus,
schema::processor_status,
utils::{
counters::{GOT_CONNECTION_COUNT, UNABLE_TO_GET_CONNECTION_COUNT},
database::{execute_with_better_error, PgDbPool, PgPoolConnection},
util::parse_timestamp,
},
use crate::utils::{
counters::{GOT_CONNECTION_COUNT, UNABLE_TO_GET_CONNECTION_COUNT},
database::{PgDbPool, PgPoolConnection},
};
use aptos_protos::transaction::v1::Transaction as ProtoTransaction;
use async_trait::async_trait;
use diesel::{pg::upsert::excluded, prelude::*};
use enum_dispatch::enum_dispatch;
use serde::{Deserialize, Serialize};
use std::{fmt::Debug, sync::Arc};
Expand Down Expand Up @@ -115,9 +109,10 @@ pub trait ProcessorTrait: Send + Sync + Debug {
/// versions are successful because any gap would cause the processor to panic
async fn update_last_processed_version(
&self,
version: u64,
last_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
_version: u64,
_last_transaction_timestamp: Option<aptos_protos::util::timestamp::Timestamp>,
) -> anyhow::Result<()> {
/*
let timestamp = last_transaction_timestamp.map(|t| parse_timestamp(&t, version as i64));
let status = ProcessorStatus {
processor: self.name().to_string(),
Expand All @@ -139,7 +134,7 @@ pub trait ProcessorTrait: Send + Sync + Debug {
)),
Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "),
)
.await?;
.await?;*/
Ok(())
}
}
Expand Down
10 changes: 10 additions & 0 deletions rust/processor/src/utils/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ pub static PROCESSED_BYTES_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// The amount of time that a thread spent waiting for a protobuf bundle of transactions
pub static PB_CHANNEL_FETCH_WAIT_TIME_SECS: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"indexer_processor_pb_channel_fetch_wait_time_secs",
"Count of bytes processed",
&["processor_name", "thread_number"]
)
.unwrap()
});

/// Count of transactions processed.
pub static NUM_TRANSACTIONS_PROCESSED_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down
15 changes: 9 additions & 6 deletions rust/processor/src/utils/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub const MAX_DIESEL_PARAM_SIZE: u16 = u16::MAX / 2;
/// we may need to chunk an array of items based on how many columns are in the table.
/// This function returns boundaries of chunks in the form of (start_index, end_index)
pub fn get_chunks(num_items_to_insert: usize, _column_count: usize) -> Vec<(usize, usize)> {
let max_item_size = 100; // MAX_DIESEL_PARAM_SIZE as usize / column_count;
let max_item_size = 40; // MAX_DIESEL_PARAM_SIZE as usize / column_count;
let mut chunk: (usize, usize) = (0, min(num_items_to_insert, max_item_size));
let mut chunks = vec![chunk];
while chunk.1 != num_items_to_insert {
Expand Down Expand Up @@ -205,15 +205,17 @@ where
}

pub async fn execute_in_chunks<U, T>(
conn: PgDbPool,
build_query: fn(Vec<T>) -> (U, Option<&'static str>),
items_to_insert: &[T],
chunk_size: usize,
_conn: PgDbPool,
_build_query: fn(Vec<T>) -> (U, Option<&'static str>),
_items_to_insert: &[T],
_chunk_size: usize,
) -> Result<(), diesel::result::Error>
where
U: QueryFragment<diesel::pg::Pg> + diesel::query_builder::QueryId + Send,
T: serde::Serialize + for<'de> serde::Deserialize<'de> + Clone,
{
// TEMPORARILY SKIP ACTUALLY WRITING, SO WE CAN TEST THE FETCHING SPEED
/*
let chunks = get_chunks(items_to_insert.len(), chunk_size);

let futures = chunks.into_iter().map(|(start_ind, end_ind)| {
Expand All @@ -230,10 +232,11 @@ where
});
for res in futures_util::future::join_all(futures).await {
res?;
}
}*/
Ok(())
}

#[allow(dead_code)]
async fn execute_or_retry_cleaned<U, T>(
conn: PgDbPool,
build_query: fn(Vec<T>) -> (U, Option<&'static str>),
Expand Down
Loading
Loading