Skip to content

Commit

Permalink
cdb compatible migrations
Browse files Browse the repository at this point in the history
Remove transactions and make migrations not async

lint

fmt

clippy

diesel async TLS sslrootcert

lint

lint

support optional tls

format

more log

parallel inserst

lint

bigger pool

pool size 200

try bigger buffer

try fixed 100 insert size

use ahash + update rust

smaller batches, bigger pool

increase pool size to 800

small refac for readability

increase buffer to 150

try batch size 20

back to 100 buffer

refactor grpc into separate file

lint

try 40mb buffers

insert of 10 again

ARC instead of cloning txns

lint

avoid another clone

try size 50

try 100

tryp 65

Change threading model for higher parallelism and throughput (#249)

Co-authored-by: jillxuu <[email protected]>

clean

cleanup

try 200 connections

coin processor spawn blocking

sleep well

ARC and consistent parallelism

database parallelism undo

no CDB compat

Use gap detector

Don't panic in gaps

TEMP CHANGE FOR LOAD TEST

send in chunks

gap detector bigger

parallel writes to db

try chunks of 40

5k gap

fix channel length

post load test cleanup

temporary execute in chunks

cleanup and comments

Add config for table chunk size

cleanup
  • Loading branch information
CapCap committed Mar 11, 2024
1 parent cc764f8 commit 6e9c4ae
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 24 deletions.
4 changes: 2 additions & 2 deletions rust/processor/src/gap_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use ahash::AHashMap;
use kanal::AsyncReceiver;
use tracing::{error, info};

// Size of a gap (in txn version) before gap detected
pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 500;
// Number of batches processed before gap detected
pub const DEFAULT_GAP_DETECTION_BATCH_SIZE: u64 = 100;
// Number of seconds between each processor status update
const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1;

Expand Down
43 changes: 21 additions & 22 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl Worker {
receiver_clone.clone(),
task_index,
)
.await;
.await;

let size_in_bytes = transactions_pb.size_in_bytes as f64;
let first_txn_version = transactions_pb
Expand All @@ -336,33 +336,21 @@ impl Worker {
let start_txn_timestamp = transactions_pb.start_txn_timestamp.clone();
let end_txn_timestamp = transactions_pb.end_txn_timestamp.clone();

let start_txn_timestamp_unix = start_txn_timestamp
.as_ref()
.map(timestamp_to_unixtime)
.unwrap_or_default();
let start_txn_timestamp_iso = start_txn_timestamp
.as_ref()
.map(timestamp_to_iso)
.unwrap_or_default();
let end_txn_timestamp_iso = end_txn_timestamp
.as_ref()
.map(timestamp_to_iso)
.unwrap_or_default();

let txn_channel_fetch_latency_sec =
txn_channel_fetch_latency.elapsed().as_secs_f64();

debug!(
processor_name = processor_name,
service_type = PROCESSOR_SERVICE_TYPE,
start_version = batch_first_txn_version,
end_version = batch_last_txn_version,
num_of_transactions =
(batch_last_txn_version - batch_first_txn_version) as i64 + 1,
first_txn_version,
batch_first_txn_version,
last_txn_version,
batch_last_txn_version,
num_of_transactions = (last_txn_version - first_txn_version) as i64 + 1,
size_in_bytes,
task_index,
duration_in_secs = txn_channel_fetch_latency_sec,
tps = (batch_last_txn_version as f64 - batch_first_txn_version as f64)
tps = (last_txn_version as f64 - first_txn_version as f64)
/ txn_channel_fetch_latency_sec,
bytes_per_sec = size_in_bytes / txn_channel_fetch_latency_sec,
"[Parser][T#{}] Successfully fetched transactions from channel.",
Expand Down Expand Up @@ -426,11 +414,22 @@ impl Worker {
let processing_time = processing_time.elapsed().as_secs_f64();

// We've processed things: do some data and metrics

ma.tick_now((last_txn_version - first_txn_version) + 1);
let tps = (ma.avg() * 1000.0) as u64;
let start_txn_timestamp_unix = start_txn_timestamp
.as_ref()
.map(timestamp_to_unixtime)
.unwrap_or_default();
let start_txn_timestamp_iso = start_txn_timestamp
.as_ref()
.map(timestamp_to_iso)
.unwrap_or_default();
let end_txn_timestamp_iso = end_txn_timestamp
.as_ref()
.map(timestamp_to_iso)
.unwrap_or_default();

let num_processed = (last_txn_version - first_txn_version) + 1;
ma.tick_now(num_processed);
let tps = (ma.avg() * 1000.0) as u64;

debug!(
processor_name = processor_name,
Expand Down

0 comments on commit 6e9c4ae

Please sign in to comment.