Skip to content

Commit

Permalink
cdb compatible migrations
Browse files Browse the repository at this point in the history
Remove transactions and make migrations not async

lint

fmt

clippy

diesel async TLS sslrootcert

lint

lint

support optional tls

format

more log

parallel inserst

lint

bigger pool

pool size 200

try bigger buffer

try fixed 100 insert size

use ahash + update rust

smaller batches, bigger pool

increase pool size to 800

small refac for readability

increase buffer to 150

try batch size 20

back to 100 buffer

refactor grpc into separate file

lint

try 40mb buffers

insert of 10 again

ARC instead of cloning txns

lint

avoid another clone

try size 50

try 100

tryp 65

Change threading model for higher parallelism and throughput (#249)

Co-authored-by: jillxuu <[email protected]>

clean

cleanup

try 200 connections

coin processor spawn blocking

sleep well

ARC and consistent parallelism

database parallelism undo

no CDB compat

Use gap detector

Don't panic in gaps

TEMP CHANGE FOR LOAD TEST

send in chunks

gap detector bigger

parallel writes to db

try chunks of 40

5k gap

fix channel length

post load test cleanup

temporary execute in chunks

cleanup and comments

Add config for table chunk size

cleanup
  • Loading branch information
CapCap authored and rtso committed Feb 22, 2024
1 parent d5dc7a0 commit 2020f0c
Show file tree
Hide file tree
Showing 25 changed files with 1,486 additions and 1,130 deletions.
24 changes: 22 additions & 2 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ diesel = { version = "2.1", features = [
"chrono",
"postgres_backend",
"numeric",
"postgres",
"serde_json",
] }
diesel-async = { version = "0.4", features = ["postgres", "bb8", "tokio"] }
Expand All @@ -55,6 +56,7 @@ cloud-storage = { version = "0.11.1", features = ["global-client"] }
google-cloud-googleapis = "0.10.0"
google-cloud-pubsub = "0.18.0"
hex = "0.4.3"
itertools = "0.12.1"
kanal = { version = "0.1.0-pre8", features = ["async"] }
once_cell = "1.10.0"
num_cpus = "1.16.0"
Expand Down
1 change: 1 addition & 0 deletions rust/processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ gcloud-sdk = { workspace = true }
google-cloud-googleapis = { workspace = true }
google-cloud-pubsub = { workspace = true }
hex = { workspace = true }
itertools = { workspace = true }
kanal = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
Expand Down
20 changes: 20 additions & 0 deletions rust/processor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::{
gap_detector::DEFAULT_GAP_DETECTION_BATCH_SIZE, processors::ProcessorConfig, worker::Worker,
};
use ahash::AHashMap;
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use server_framework::RunnableConfig;
Expand All @@ -20,19 +21,36 @@ pub struct IndexerGrpcProcessorConfig {
#[serde(flatten)]
pub grpc_http2_config: IndexerGrpcHttp2Config,
pub auth_token: String,
// Version to start indexing from
pub starting_version: Option<u64>,
// Version to end indexing at
pub ending_version: Option<u64>,
// Number of tasks waiting to pull transaction batches from the channel and process them
pub number_concurrent_processing_tasks: Option<usize>,
// Size of the pool for writes/reads to the DB. Limits maximum number of queries in flight
pub db_pool_size: Option<u32>,
// Maximum number of batches "missing" before we assume we have an issue with gaps and abort
#[serde(default = "IndexerGrpcProcessorConfig::default_gap_detection_batch_size")]
pub gap_detection_batch_size: u64,
// Number of protobuff transactions to send per chunk to the processor tasks
#[serde(default = "IndexerGrpcProcessorConfig::default_pb_channel_txn_chunk_size")]
pub pb_channel_txn_chunk_size: usize,
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
#[serde(default = "AHashMap::new")]
pub per_table_chunk_sizes: AHashMap<String, usize>,
pub enable_verbose_logging: Option<bool>,
}

impl IndexerGrpcProcessorConfig {
pub const fn default_gap_detection_batch_size() -> u64 {
DEFAULT_GAP_DETECTION_BATCH_SIZE
}

/// Make the default very large on purpose so that by default it's not chunked
/// This prevents any unexpected changes in behavior
pub const fn default_pb_channel_txn_chunk_size() -> usize {
100_000
}
}

#[async_trait::async_trait]
Expand All @@ -49,6 +67,8 @@ impl RunnableConfig for IndexerGrpcProcessorConfig {
self.number_concurrent_processing_tasks,
self.db_pool_size,
self.gap_detection_batch_size,
self.pb_channel_txn_chunk_size,
self.per_table_chunk_sizes.clone(),
self.enable_verbose_logging,
)
.await
Expand Down
13 changes: 6 additions & 7 deletions rust/processor/src/gap_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use crate::{
};
use ahash::AHashMap;
use kanal::AsyncReceiver;
use std::sync::Arc;
use tracing::{error, info};

// Number of batches processed before gap detected
pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 50;
pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 100;
// Number of seconds between each processor status update
const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1;

Expand Down Expand Up @@ -70,7 +69,7 @@ impl GapDetector {

pub async fn create_gap_detector_status_tracker_loop(
gap_detector_receiver: AsyncReceiver<ProcessingResult>,
processor: Arc<Processor>,
processor: Processor,
starting_version: u64,
gap_detection_batch_size: u64,
) {
Expand Down Expand Up @@ -117,7 +116,7 @@ pub async fn create_gap_detector_status_tracker_loop(
processor
.update_last_processed_version(
res_last_success_batch.end_version,
res_last_success_batch.last_transaction_timstamp.clone(),
res_last_success_batch.last_transaction_timestamp.clone(),
)
.await
.unwrap();
Expand All @@ -132,7 +131,7 @@ pub async fn create_gap_detector_status_tracker_loop(
error = ?e,
"[Parser] Gap detector task has panicked"
);
panic!();
panic!("[Parser] Gap detector task has panicked: {:?}", e);
},
}
}
Expand All @@ -152,7 +151,7 @@ mod test {
let result = ProcessingResult {
start_version: 100 + i * 100,
end_version: 199 + i * 100,
last_transaction_timstamp: None,
last_transaction_timestamp: None,
processing_duration_in_secs: 0.0,
db_insertion_duration_in_secs: 0.0,
};
Expand All @@ -167,7 +166,7 @@ mod test {
.process_versions(ProcessingResult {
start_version: 0,
end_version: 99,
last_transaction_timstamp: None,
last_transaction_timestamp: None,
processing_duration_in_secs: 0.0,
db_insertion_duration_in_secs: 0.0,
})
Expand Down
Loading

0 comments on commit 2020f0c

Please sign in to comment.