diff --git a/Cargo.lock b/Cargo.lock index c5e3eb2a4..fc7d6b751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1623,10 +1623,8 @@ dependencies = [ "anchor-client", "anchor-lang", "anyhow", - "async-trait", "backon", "base64 0.21.4", - "blockbuster", "borsh 0.10.3", "bs58 0.4.0", "cadence", @@ -1639,15 +1637,12 @@ dependencies = [ "flatbuffers", "futures", "futures-util", - "hyper", "indicatif", "lazy_static", "log", "mpl-bubblegum", - "num-traits", "plerkle_messenger", "plerkle_serialization", - "rand 0.8.5", "redis", "regex", "reqwest", @@ -1658,20 +1653,14 @@ dependencies = [ "serde_json", "solana-account-decoder", "solana-client", - "solana-geyser-plugin-interface", "solana-sdk", - "solana-sdk-macro", "solana-transaction-status", "spl-account-compression", "spl-concurrent-merkle-tree", - "spl-token 4.0.0", "sqlx", - "stretto", "thiserror", "tokio", "tokio-postgres", - "tokio-stream", - "tracing-subscriber", "url", "uuid", ] diff --git a/tree_backfiller/Cargo.toml b/tree_backfiller/Cargo.toml index 8f5a6ba73..631c0ed16 100644 --- a/tree_backfiller/Cargo.toml +++ b/tree_backfiller/Cargo.toml @@ -32,7 +32,6 @@ thiserror = "1.0.31" serde_json = "1.0.81" cadence = "0.29.0" cadence-macros = "0.29.0" -hyper = "0.14.23" anchor-client = "0.28.0" tokio = { version = "1.26.0", features = ["full", "tracing"] } sqlx = { version = "0.6.2", features = [ @@ -69,29 +68,15 @@ mpl-bubblegum = "1.0.1-beta.3" spl-account-compression = { version = "0.2.0", features = ["no-entrypoint"] } spl-concurrent-merkle-tree = "0.2.0" uuid = "1.0.0" -async-trait = "0.1.53" -num-traits = "0.2.15" -blockbuster = "0.9.0-beta.1" figment = { version = "0.10.6", features = ["env", "toml", "yaml"] } solana-sdk = "~1.16.16" solana-client = "~1.16.16" -spl-token = { version = ">= 3.5.0, < 5.0", features = ["no-entrypoint"] } solana-transaction-status = "~1.16.16" solana-account-decoder = "~1.16.16" -solana-geyser-plugin-interface = "~1.16.16" -solana-sdk-macro = "~1.16.16" -rand = "0.8.5" rust-crypto = "0.2.36" url = "2.3.1" anchor-lang = "0.28.0" borsh = "~0.10.3" -stretto = { version = "0.7", features = ["async"] } -tokio-stream = "0.1.12" -tracing-subscriber = { version = "0.3.16", features = [ - "json", - "env-filter", - "ansi", -] } clap = { version = "4.2.2", features = ["derive", "cargo", "env"] } [lints] diff --git a/tree_backfiller/README.md b/tree_backfiller/README.md index 6a3e135df..f9d5958bc 100644 --- a/tree_backfiller/README.md +++ b/tree_backfiller/README.md @@ -1,6 +1,6 @@ # Tree Backfiller -The Tree Backfiller crawls all trees on-chain and backfills any transactions related to a tree that have not already been observed. +The tree backfiller CLI assists in detecting and indexing missing updates within a compression tree that are managed by the MPL bubblegum program. ## Commands @@ -10,34 +10,59 @@ Command line arguments can also be set through environment variables. The `run` command initiates the crawling and backfilling process. It requires the Solana RPC URL, the database URL, and the messenger Redis URL. +```mermaid +flowchart + start((Start)) -->init[Initialize RPC, DB] + init --> fetchTreesDB[Fetch Trees from DB] + fetchTreesDB --> findGapsDB[Find Gaps in DB] + findGapsDB --> enqueueGapFills[Enqueue Gap Fills] + enqueueGapFills --> gapWorkerManager[Gap Worker Manager] + gapWorkerManager --> crawlSignatures[Crawl Solana RPC] + crawlSignatures --> enqueueSignatures[Enqueue Signatures] + enqueueSignatures --> transactionWorkerManager[Transaction Worker Manager] + transactionWorkerManager --> fetchTransactionsRPC[Fetch Transactions RPC] + fetchTransactionsRPC --> processTransactions[Push Transaction to Messenger] + processTransactions ---> Finished ``` -Usage: das-tree-backfiller run [OPTIONS] --solana-rpc-url --database-url --messenger-redis-url + +``` +Usage: das-tree-backfiller run [OPTIONS] --database-url --messenger-redis-url --solana-rpc-url Options: - --solana-rpc-url - Solana RPC URL [env: SOLANA_RPC_URL=] --tree-crawler-count - Number of tree crawler workers [env: TREE_CRAWLER_COUNT=] [default: 100] + Number of tree crawler workers [env: TREE_CRAWLER_COUNT=] [default: 20] --signature-channel-size - The size of the signature channel. This is the number of signatures that can be queued up. [env: SIGNATURE_CHANNEL_SIZE=] [default: 10000] - --queue-channel-size - [env: QUEUE_CHANNEL_SIZE=] [default: 1000] + The size of the signature channel [env: SIGNATURE_CHANNEL_SIZE=] [default: 10000] + --gap-channel-size + The size of the signature channel [env: GAP_CHANNEL_SIZE=] [default: 1000] + --transaction-worker-count + The number of transaction workers [env: TRANSACTION_WORKER_COUNT=] [default: 100] + --gap-worker-count + The number of gap workers [env: GAP_WORKER_COUNT=] [default: 25] + --only-trees + The list of trees to crawl. If not specified, all trees will be crawled [env: ONLY_TREES=] --database-url - [env: DATABASE_URL=postgres://solana:solana@localhost:5432/solana] + The database URL [env: DATABASE_URL=] --database-max-connections - [env: DATABASE_MAX_CONNECTIONS=] [default: 125] + The maximum number of connections to the database [env: DATABASE_MAX_CONNECTIONS=] [default: 125] --database-min-connections - [env: DATABASE_MIN_CONNECTIONS=] [default: 5] + The minimum number of connections to the database [env: DATABASE_MIN_CONNECTIONS=] [default: 5] --messenger-redis-url - [env: MESSENGER_REDIS_URL=redis://localhost:6379] + [env: MESSENGER_REDIS_URL=] --messenger-redis-batch-size [env: MESSENGER_REDIS_BATCH_SIZE=] [default: 100] - --messenger-stream-max-buffer-size - [env: MESSENGER_STREAM_MAX_BUFFER_SIZE=] [default: 10000000] + --messenger-queue-connections + [env: MESSENGER_QUEUE_CONNECTIONS=] [default: 25] + --messenger-queue-stream + [env: MESSENGER_QUEUE_STREAM=] [default: TXNFILL] --metrics-host [env: METRICS_HOST=] [default: 127.0.0.1] --metrics-port [env: METRICS_PORT=] [default: 8125] + --metrics-prefix + [env: METRICS_PREFIX=] [default: das.backfiller] + --solana-rpc-url + [env: SOLANA_RPC_URL=] -h, --help Print help ``` @@ -49,8 +74,11 @@ The Tree Backfiller provides several metrics for monitoring performance and stat Metric | Description --- | --- transaction.failed | Count of failed transaction +transaction.succeeded | Count of successfully queued transaction transaction.queued | Time for a transaction to be queued +gap.failed | Count of failed gap crawling +gap.succeeded | Count of successfully crawled gaps +gap.queued | Time for a gap to be queued +tree.succeeded | Count of completed tree crawl tree.crawled | Time to crawl a tree -tree.completed | Count of completed tree crawl -tree.failed | Count of failed tree crawls job.completed | Time to complete the job diff --git a/tree_backfiller/src/backfiller.rs b/tree_backfiller/src/backfiller.rs index 4ec028e6f..ac10e5bc0 100644 --- a/tree_backfiller/src/backfiller.rs +++ b/tree_backfiller/src/backfiller.rs @@ -1,23 +1,23 @@ use crate::{ db, metrics::{setup_metrics, MetricsArgs}, - queue, + queue::{QueueArgs, QueuePool}, rpc::{Rpc, SolanaRpcArgs}, - tree::{self, TreeGapFill, TreeGapModel}, + tree::{TreeErrorKind, TreeGapFill, TreeGapModel, TreeResponse}, }; use anyhow::Result; use cadence_macros::{statsd_count, statsd_time}; use clap::{Parser, ValueEnum}; use digital_asset_types::dao::cl_audits_v2; +use flatbuffers::FlatBufferBuilder; +use futures::{stream::FuturesUnordered, StreamExt}; use indicatif::HumanDuration; use log::{error, info}; +use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QueryOrder, SqlxPostgresConnector}; use solana_sdk::signature::Signature; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::time::Duration; use std::time::Instant; -use tokio::sync::{mpsc, Semaphore}; +use tokio::sync::mpsc; #[derive(Debug, Parser, Clone, ValueEnum, PartialEq, Eq)] pub enum CrawlDirection { @@ -31,33 +31,33 @@ pub struct Args { #[arg(long, env, default_value = "20")] pub tree_crawler_count: usize, - /// The size of the signature channel. This is the number of signatures that can be queued up. + /// The size of the signature channel. #[arg(long, env, default_value = "10000")] pub signature_channel_size: usize, - /// The size of the signature channel. This is the number of signatures that can be queued up. + /// The size of the signature channel. #[arg(long, env, default_value = "1000")] pub gap_channel_size: usize, + /// The number of transaction workers. #[arg(long, env, default_value = "100")] pub transaction_worker_count: usize, + /// The number of gap workers. #[arg(long, env, default_value = "25")] pub gap_worker_count: usize, + /// The list of trees to crawl. If not specified, all trees will be crawled. #[arg(long, env, use_value_delimiter = true)] pub only_trees: Option>, - #[arg(long, env, default_value = "forward")] - pub crawl_direction: CrawlDirection, - /// Database configuration #[clap(flatten)] pub database: db::PoolArgs, /// Redis configuration #[clap(flatten)] - pub queue: queue::QueueArgs, + pub queue: QueueArgs, /// Metrics configuration #[clap(flatten)] @@ -68,68 +68,35 @@ pub struct Args { pub solana: SolanaRpcArgs, } -/// A thread-safe counter. -pub struct Counter(Arc); - -impl Counter { - /// Creates a new counter initialized to zero. - pub fn new() -> Self { - Self(Arc::new(AtomicUsize::new(0))) - } - - /// Increments the counter by one. - pub fn increment(&self) { - self.0.fetch_add(1, Ordering::SeqCst); - } - - /// Decrements the counter by one. - pub fn decrement(&self) { - self.0.fetch_sub(1, Ordering::SeqCst); - } - - /// Returns the current value of the counter. - pub fn get(&self) -> usize { - self.0.load(Ordering::SeqCst) - } - - /// Returns a future that resolves when the counter reaches zero. - /// The future periodically checks the counter value and sleeps for a short duration. - pub fn zero(&self) -> impl std::future::Future { - let counter = self.clone(); - async move { - while counter.get() > 0 { - tokio::time::sleep(Duration::from_millis(100)).await; - } - } - } -} - -impl Clone for Counter { - /// Returns a clone of the counter. - /// The returned counter shares the same underlying atomic integer. - fn clone(&self) -> Self { - Self(Arc::clone(&self.0)) - } -} - -/// Runs the backfilling process for trees. +/// Runs the backfilling process for the tree crawler. /// -/// This function initializes the necessary components such as the Solana RPC client, -/// database connection, metrics, and worker queues. It then fetches all trees and -/// starts the crawling process for each tree in parallel, respecting the configured -/// concurrency limits. It also listens for signatures and processes transactions -/// concurrently. After crawling all trees, it completes the transaction handling -/// and logs the total time taken for the job. +/// This function initializes the necessary components for the backfilling process, +/// including database connections, RPC clients, and worker managers for handling +/// transactions and gaps. It then proceeds to fetch the trees that need to be crawled +/// and manages the crawling process across multiple workers. +/// +/// The function handles the following major tasks: +/// - Establishing connections to the database and initializing RPC clients. +/// - Setting up channels for communication between different parts of the system. +/// - Spawning worker managers for processing transactions and gaps. +/// - Fetching trees from the database and managing their crawling process. +/// - Reporting metrics and logging information throughout the process. /// /// # Arguments /// -/// * `config` - The configuration settings for the backfiller, including RPC URLs, -/// database settings, and worker counts. +/// * `config` - A configuration object containing settings for the backfilling process, +/// including database, RPC, and worker configurations. /// /// # Returns /// /// This function returns a `Result` which is `Ok` if the backfilling process completes -/// successfully, or an `Error` if any part of the process fails. +/// successfully, or an `Err` with an appropriate error message if any part of the process +/// fails. +/// +/// # Errors +/// +/// This function can return errors related to database connectivity, RPC failures, +/// or issues with spawning and managing worker tasks. pub async fn run(config: Args) -> Result<()> { let pool = db::connect(config.database).await?; @@ -140,33 +107,28 @@ pub async fn run(config: Args) -> Result<()> { setup_metrics(config.metrics)?; let (sig_sender, mut sig_receiver) = mpsc::channel::(config.signature_channel_size); + let gap_sig_sender = sig_sender.clone(); let (gap_sender, mut gap_receiver) = mpsc::channel::(config.gap_channel_size); - let gap_count = Counter::new(); - let gap_worker_gap_count = gap_count.clone(); + let queue = QueuePool::try_from_config(config.queue).await?; - let transaction_count = Counter::new(); - let transaction_worker_transaction_count = transaction_count.clone(); + let transaction_worker_count = config.transaction_worker_count; - let queue = queue::QueuePool::try_from_config(config.queue).await?; - - tokio::spawn(async move { - let semaphore = Arc::new(Semaphore::new(config.transaction_worker_count)); + let transaction_worker_manager = tokio::spawn(async move { + let mut handlers = FuturesUnordered::new(); while let Some(signature) = sig_receiver.recv().await { + if handlers.len() >= transaction_worker_count { + handlers.next().await; + } + let solana_rpc = transaction_solana_rpc.clone(); let queue = queue.clone(); - let semaphore = semaphore.clone(); - let count = transaction_worker_transaction_count.clone(); - - count.increment(); - - tokio::spawn(async move { - let _permit = semaphore.acquire().await?; + let handle = tokio::spawn(async move { let timing = Instant::now(); - if let Err(e) = tree::transaction(&solana_rpc, queue, signature).await { + if let Err(e) = queue_transaction(&solana_rpc, queue, signature).await { error!("tree transaction: {:?}", e); statsd_count!("transaction.failed", 1); } else { @@ -174,30 +136,28 @@ pub async fn run(config: Args) -> Result<()> { } statsd_time!("transaction.queued", timing.elapsed()); - - count.decrement(); - - Ok::<(), anyhow::Error>(()) }); + + handlers.push(handle); } - Ok::<(), anyhow::Error>(()) + futures::future::join_all(handlers).await; }); - tokio::spawn(async move { - let semaphore = Arc::new(Semaphore::new(config.gap_worker_count)); + let gap_worker_count = config.gap_worker_count; - while let Some(gap) = gap_receiver.recv().await { - let solana_rpc = gap_solana_rpc.clone(); - let sig_sender = sig_sender.clone(); - let semaphore = semaphore.clone(); - let count = gap_worker_gap_count.clone(); + let gap_worker_manager = tokio::spawn(async move { + let mut handlers = FuturesUnordered::new(); - count.increment(); + while let Some(gap) = gap_receiver.recv().await { + if handlers.len() >= gap_worker_count { + handlers.next().await; + } - tokio::spawn(async move { - let _permit = semaphore.acquire().await?; + let solana_rpc = gap_solana_rpc.clone(); + let sig_sender = gap_sig_sender.clone(); + let handle = tokio::spawn(async move { let timing = Instant::now(); if let Err(e) = gap.crawl(&solana_rpc, sig_sender).await { @@ -209,23 +169,22 @@ pub async fn run(config: Args) -> Result<()> { } statsd_time!("gap.queued", timing.elapsed()); - - count.decrement(); - - Ok::<(), anyhow::Error>(()) }); + + handlers.push(handle); } - Ok::<(), anyhow::Error>(()) + futures::future::join_all(handlers).await; }); let started = Instant::now(); let trees = if let Some(only_trees) = config.only_trees { - tree::TreeResponse::find(&solana_rpc, only_trees).await? + TreeResponse::find(&solana_rpc, only_trees).await? } else { - tree::TreeResponse::all(&solana_rpc).await? + TreeResponse::all(&solana_rpc).await? }; + let tree_count = trees.len(); info!( @@ -234,18 +193,19 @@ pub async fn run(config: Args) -> Result<()> { HumanDuration(started.elapsed()) ); - let semaphore = Arc::new(Semaphore::new(config.tree_crawler_count)); - let mut crawl_handles = Vec::with_capacity(tree_count); + let tree_crawler_count = config.tree_crawler_count; + let mut crawl_handles = FuturesUnordered::new(); for tree in trees { - let semaphore = semaphore.clone(); + if crawl_handles.len() >= tree_crawler_count { + crawl_handles.next().await; + } + let gap_sender = gap_sender.clone(); let pool = pool.clone(); let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pool); let crawl_handle = tokio::spawn(async move { - let _permit = semaphore.acquire().await?; - let timing = Instant::now(); let mut gaps = TreeGapModel::find(&conn, tree.pubkey) @@ -281,13 +241,14 @@ pub async fn run(config: Args) -> Result<()> { gaps.push(TreeGapFill::new(tree.pubkey, None, None)); } - if let Some(lower_seq) = lower_known_seq { + if let Some(lower_seq) = lower_known_seq.filter(|seq| seq.seq > 1) { let signature = Signature::try_from(lower_seq.tx.as_ref())?; info!( "tree {} has known lowest seq {} filling tree starting at {}", tree.pubkey, lower_seq.seq, signature ); + gaps.push(TreeGapFill::new(tree.pubkey, Some(signature), None)); } @@ -312,12 +273,14 @@ pub async fn run(config: Args) -> Result<()> { } futures::future::try_join_all(crawl_handles).await?; + drop(gap_sender); info!("crawled all trees"); - gap_count.zero().await; - info!("all gaps queued"); + gap_worker_manager.await?; + drop(sig_sender); + info!("all gaps processed"); - transaction_count.zero().await; + transaction_worker_manager.await?; info!("all transactions queued"); statsd_time!("job.completed", started.elapsed()); @@ -330,3 +293,17 @@ pub async fn run(config: Args) -> Result<()> { Ok(()) } + +async fn queue_transaction<'a>( + client: &Rpc, + queue: QueuePool, + signature: Signature, +) -> Result<(), TreeErrorKind> { + let transaction = client.get_transaction(&signature).await?; + + let message = seralize_encoded_transaction_with_status(FlatBufferBuilder::new(), transaction)?; + + queue.push(message.finished_data()).await?; + + Ok(()) +} diff --git a/tree_backfiller/src/db.rs b/tree_backfiller/src/db.rs index fd718113c..34db03e93 100644 --- a/tree_backfiller/src/db.rs +++ b/tree_backfiller/src/db.rs @@ -7,10 +7,13 @@ use sqlx::{ #[derive(Debug, Parser, Clone)] pub struct PoolArgs { + /// The database URL. #[arg(long, env)] pub database_url: String, + /// The maximum number of connections to the database. #[arg(long, env, default_value = "125")] pub database_max_connections: u32, + /// The minimum number of connections to the database. #[arg(long, env, default_value = "5")] pub database_min_connections: u32, } diff --git a/tree_backfiller/src/main.rs b/tree_backfiller/src/main.rs index 62c389f03..2fd4bb623 100644 --- a/tree_backfiller/src/main.rs +++ b/tree_backfiller/src/main.rs @@ -13,7 +13,6 @@ struct Args { enum Command { /// The 'run' command is used to cross-reference the index against on-chain accounts. /// It crawls through trees and backfills any missed tree transactions. - /// This is particularly useful for ensuring data consistency and completeness. #[command(name = "run")] Run(backfiller::Args), } diff --git a/tree_backfiller/src/metrics.rs b/tree_backfiller/src/metrics.rs index a5ac4f4f6..9c0d3c531 100644 --- a/tree_backfiller/src/metrics.rs +++ b/tree_backfiller/src/metrics.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient, Timed}; +use cadence::{BufferedUdpMetricSink, QueuingMetricSink, StatsdClient}; use cadence_macros::set_global_default; use clap::Parser; use std::net::UdpSocket; diff --git a/tree_backfiller/src/queue.rs b/tree_backfiller/src/queue.rs index 01588edc3..5d1deb4b2 100644 --- a/tree_backfiller/src/queue.rs +++ b/tree_backfiller/src/queue.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use tokio::sync::mpsc; use tokio::sync::{mpsc::error::TrySendError, Mutex}; -const TRANSACTION_BACKFILL_STREAM: &'static str = "TXNFILL"; +const TRANSACTION_BACKFILL_STREAM: &str = "TXNFILL"; #[derive(Clone, Debug, Parser)] pub struct QueueArgs { @@ -40,7 +40,7 @@ impl From for MessengerConfig { Self { messenger_type: MessengerType::Redis, - connection_config: connection_config, + connection_config, } } } diff --git a/tree_backfiller/src/rpc.rs b/tree_backfiller/src/rpc.rs index 361d88e17..0b4856401 100644 --- a/tree_backfiller/src/rpc.rs +++ b/tree_backfiller/src/rpc.rs @@ -49,7 +49,6 @@ impl Rpc { commitment: Some(CommitmentConfig { commitment: CommitmentLevel::Finalized, }), - ..RpcTransactionConfig::default() }, ) .await diff --git a/tree_backfiller/src/tree.rs b/tree_backfiller/src/tree.rs index 07f05bc3f..ba55b6b01 100644 --- a/tree_backfiller/src/tree.rs +++ b/tree_backfiller/src/tree.rs @@ -1,9 +1,6 @@ use anyhow::Result; use borsh::BorshDeserialize; use clap::Args; -use flatbuffers::FlatBufferBuilder; -use log::error; -use plerkle_serialization::serializer::seralize_encoded_transaction_with_status; use sea_orm::{DatabaseConnection, DbBackend, FromQueryResult, Statement, Value}; use solana_client::rpc_filter::{Memcmp, RpcFilterType}; use solana_sdk::{account::Account, pubkey::Pubkey, signature::Signature}; @@ -15,10 +12,7 @@ use std::str::FromStr; use thiserror::Error as ThisError; use tokio::sync::mpsc::Sender; -use crate::{ - queue::{QueuePool, QueuePoolError}, - rpc::Rpc, -}; +use crate::{queue::QueuePoolError, rpc::Rpc}; const GET_SIGNATURES_FOR_ADDRESS_LIMIT: usize = 1000; @@ -256,7 +250,7 @@ impl TreeResponse { let accounts = client.get_multiple_accounts(batch).await?; let results: Vec<(&Pubkey, Option)> = - batch.into_iter().zip(accounts).collect(); + batch.iter().zip(accounts).collect(); Ok::<_, TreeErrorKind>(results) }) @@ -268,11 +262,7 @@ impl TreeResponse { .into_iter() .flatten() .filter_map(|(pubkey, account)| { - if let Some(account) = account { - Some(Self::try_from_rpc(*pubkey, account)) - } else { - None - } + account.map(|account| Self::try_from_rpc(*pubkey, account)) }) .collect::, _>>() .map_err(|_| TreeErrorKind::SerializeTreeResponse)?; @@ -280,17 +270,3 @@ impl TreeResponse { Ok(trees) } } - -pub async fn transaction<'a>( - client: &Rpc, - queue: QueuePool, - signature: Signature, -) -> Result<(), TreeErrorKind> { - let transaction = client.get_transaction(&signature).await?; - - let message = seralize_encoded_transaction_with_status(FlatBufferBuilder::new(), transaction)?; - - queue.push(message.finished_data()).await?; - - Ok(()) -}