From da0c6d6c9576b388c6b21966156f657c746bb1d1 Mon Sep 17 00:00:00 2001 From: phoenix Date: Tue, 18 Jun 2024 16:29:57 -0400 Subject: [PATCH] [indexer] run all workflows on the same executor --- .../handlers/objects_snapshot_processor.rs | 56 ++-------------- crates/sui-indexer/src/indexer.rs | 64 +++++++++++++++---- 2 files changed, 54 insertions(+), 66 deletions(-) diff --git a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs index 912dc2adc2076..907036005378a 100644 --- a/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs +++ b/crates/sui-indexer/src/handlers/objects_snapshot_processor.rs @@ -7,20 +7,16 @@ use futures::StreamExt; use mysten_metrics::get_metrics; use mysten_metrics::metered_channel::{Receiver, Sender}; use mysten_metrics::spawn_monitored_task; -use prometheus::Registry; -use sui_data_ingestion_core::{ - DataIngestionMetrics, IndexerExecutor, ReaderOptions, ShimProgressStore, Worker, WorkerPool, -}; +use sui_data_ingestion_core::Worker; use sui_package_resolver::{PackageStoreWithLruCache, Resolver}; use sui_rest_api::CheckpointData; use sui_types::messages_checkpoint::CheckpointSequenceNumber; -use tokio::sync::{oneshot, watch}; +use tokio::sync::watch; use tokio_util::sync::CancellationToken; use tracing::info; use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver}; use crate::types::IndexerResult; -use crate::IndexerConfig; use crate::{metrics::IndexerMetrics, store::IndexerStore}; use std::sync::{Arc, Mutex}; @@ -125,11 +121,9 @@ where pub async fn start_objects_snapshot_processor( store: S, metrics: IndexerMetrics, - config: IndexerConfig, snapshot_config: SnapshotLagConfig, - reader_options: ReaderOptions, cancel: CancellationToken, -) -> IndexerResult<()> +) -> IndexerResult<(ObjectsSnapshotProcessor, u64)> where S: IndexerStore + Clone + Sync + Send + 'static, T: R2D2Connection + 'static, @@ -142,21 +136,7 @@ where .expect("Failed to get latest snapshot checkpoint sequence number from DB") .map(|seq| seq + 1) .unwrap_or_default(); - let download_queue_size = std::env::var("DOWNLOAD_QUEUE_SIZE") - .unwrap_or_else(|_| crate::indexer::DOWNLOAD_QUEUE_SIZE.to_string()) - .parse::() - .expect("Invalid DOWNLOAD_QUEUE_SIZE"); - // Set up exit sender and receiver. Sender signals the ingestion executor to exit when - // the cancel token is triggered. - let (exit_sender, exit_receiver) = oneshot::channel(); - let cancel_clone = cancel.clone(); - spawn_monitored_task!(async move { - cancel_clone.cancelled().await; - let _ = exit_sender.send(()); - }); - - // Commit notifier to to signal the package buffer that a checkpoint has been committed. let (commit_notifier, commit_receiver) = watch::channel(None); let global_metrics = get_metrics().unwrap(); @@ -176,34 +156,6 @@ where commit_receiver, metrics.clone(), ); - let mut executor = IndexerExecutor::new( - ShimProgressStore(watermark), - 1, - DataIngestionMetrics::new(&Registry::new()), - ); - let worker_pool = WorkerPool::new( - worker, - "objects_snapshot_worker".to_string(), - download_queue_size, - ); - - executor.register(worker_pool).await?; - - spawn_monitored_task!(async move { - info!("Starting data ingestion executor for processing objects snapshot..."); - let _ = executor - .run( - config - .data_ingestion_path - .clone() - .unwrap_or(tempfile::tempdir().unwrap().into_path()), - config.remote_store_url.clone(), - vec![], - reader_options, - exit_receiver, - ) - .await; - }); // Now start the task that will commit the indexed object changes to the store. spawn_monitored_task!(ObjectsSnapshotProcessor::::commit_objects_snapshot( @@ -215,7 +167,7 @@ where snapshot_config, cancel, )); - Ok(()) + Ok((worker, watermark)) } impl ObjectsSnapshotProcessor diff --git a/crates/sui-indexer/src/indexer.rs b/crates/sui-indexer/src/indexer.rs index 4d6b12ff04858..6fede882c949f 100644 --- a/crates/sui-indexer/src/indexer.rs +++ b/crates/sui-indexer/src/indexer.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashMap; use std::env; use anyhow::Result; @@ -10,10 +11,12 @@ use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tracing::info; +use async_trait::async_trait; use mysten_metrics::spawn_monitored_task; use sui_data_ingestion_core::{ - DataIngestionMetrics, IndexerExecutor, ReaderOptions, ShimProgressStore, WorkerPool, + DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, WorkerPool, }; +use sui_types::messages_checkpoint::CheckpointSequenceNumber; use crate::build_json_rpc_server; use crate::errors::IndexerError; @@ -70,7 +73,7 @@ impl Indexer { env!("CARGO_PKG_VERSION") ); - let watermark = store + let primary_watermark = store .get_latest_checkpoint_sequence_number() .await .expect("Failed to get latest tx checkpoint sequence number from DB") @@ -96,15 +99,14 @@ impl Indexer { }; // Start objects snapshot processor, which is a separate pipeline with its ingestion pipeline. - start_objects_snapshot_processor::( - store.clone(), - metrics.clone(), - config.clone(), - snapshot_config, - extra_reader_options.clone(), - cancel.clone(), - ) - .await?; + let (object_snapshot_worker, object_snapshot_watermark) = + start_objects_snapshot_processor::( + store.clone(), + metrics.clone(), + snapshot_config, + cancel.clone(), + ) + .await?; let cancel_clone = cancel.clone(); let (exit_sender, exit_receiver) = oneshot::channel(); @@ -115,13 +117,24 @@ impl Indexer { }); let mut executor = IndexerExecutor::new( - ShimProgressStore(watermark), + ShimIndexerProgressStore::new(vec![ + ("primary".to_string(), primary_watermark), + ("object_snapshot".to_string(), object_snapshot_watermark), + ]), 1, DataIngestionMetrics::new(&Registry::new()), ); - let worker = new_handlers::(store, metrics, watermark, cancel.clone()).await?; - let worker_pool = WorkerPool::new(worker, "workflow".to_string(), download_queue_size); + let worker = + new_handlers::(store, metrics, primary_watermark, cancel.clone()).await?; + let worker_pool = WorkerPool::new(worker, "primary".to_string(), download_queue_size); + executor.register(worker_pool).await?; + + let worker_pool = WorkerPool::new( + object_snapshot_worker, + "object_snapshot".to_string(), + download_queue_size, + ); executor.register(worker_pool).await?; info!("Starting data ingestion executor..."); executor @@ -159,3 +172,26 @@ impl Indexer { Ok(()) } } + +struct ShimIndexerProgressStore { + watermarks: HashMap, +} + +impl ShimIndexerProgressStore { + fn new(watermarks: Vec<(String, CheckpointSequenceNumber)>) -> Self { + Self { + watermarks: watermarks.into_iter().collect(), + } + } +} + +#[async_trait] +impl ProgressStore for ShimIndexerProgressStore { + async fn load(&mut self, task_name: String) -> Result { + Ok(*self.watermarks.get(&task_name).expect("missing watermark")) + } + + async fn save(&mut self, _: String, _: CheckpointSequenceNumber) -> Result<()> { + Ok(()) + } +}