Skip to content

Commit

Permalink
[indexer] run all workflows on the same executor
Browse files Browse the repository at this point in the history
  • Loading branch information
phoenix-o authored and bmwill committed Jun 18, 2024
1 parent 0f691db commit da0c6d6
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 66 deletions.
56 changes: 4 additions & 52 deletions crates/sui-indexer/src/handlers/objects_snapshot_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,16 @@ use futures::StreamExt;
use mysten_metrics::get_metrics;
use mysten_metrics::metered_channel::{Receiver, Sender};
use mysten_metrics::spawn_monitored_task;
use prometheus::Registry;
use sui_data_ingestion_core::{
DataIngestionMetrics, IndexerExecutor, ReaderOptions, ShimProgressStore, Worker, WorkerPool,
};
use sui_data_ingestion_core::Worker;
use sui_package_resolver::{PackageStoreWithLruCache, Resolver};
use sui_rest_api::CheckpointData;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
use tokio::sync::{oneshot, watch};
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use tracing::info;

use crate::store::package_resolver::{IndexerStorePackageResolver, InterimPackageResolver};
use crate::types::IndexerResult;
use crate::IndexerConfig;
use crate::{metrics::IndexerMetrics, store::IndexerStore};
use std::sync::{Arc, Mutex};

Expand Down Expand Up @@ -125,11 +121,9 @@ where
pub async fn start_objects_snapshot_processor<S, T>(
store: S,
metrics: IndexerMetrics,
config: IndexerConfig,
snapshot_config: SnapshotLagConfig,
reader_options: ReaderOptions,
cancel: CancellationToken,
) -> IndexerResult<()>
) -> IndexerResult<(ObjectsSnapshotProcessor<S, T>, u64)>
where
S: IndexerStore + Clone + Sync + Send + 'static,
T: R2D2Connection + 'static,
Expand All @@ -142,21 +136,7 @@ where
.expect("Failed to get latest snapshot checkpoint sequence number from DB")
.map(|seq| seq + 1)
.unwrap_or_default();
let download_queue_size = std::env::var("DOWNLOAD_QUEUE_SIZE")
.unwrap_or_else(|_| crate::indexer::DOWNLOAD_QUEUE_SIZE.to_string())
.parse::<usize>()
.expect("Invalid DOWNLOAD_QUEUE_SIZE");

// Set up exit sender and receiver. Sender signals the ingestion executor to exit when
// the cancel token is triggered.
let (exit_sender, exit_receiver) = oneshot::channel();
let cancel_clone = cancel.clone();
spawn_monitored_task!(async move {
cancel_clone.cancelled().await;
let _ = exit_sender.send(());
});

// Commit notifier to to signal the package buffer that a checkpoint has been committed.
let (commit_notifier, commit_receiver) = watch::channel(None);

let global_metrics = get_metrics().unwrap();
Expand All @@ -176,34 +156,6 @@ where
commit_receiver,
metrics.clone(),
);
let mut executor = IndexerExecutor::new(
ShimProgressStore(watermark),
1,
DataIngestionMetrics::new(&Registry::new()),
);
let worker_pool = WorkerPool::new(
worker,
"objects_snapshot_worker".to_string(),
download_queue_size,
);

executor.register(worker_pool).await?;

spawn_monitored_task!(async move {
info!("Starting data ingestion executor for processing objects snapshot...");
let _ = executor
.run(
config
.data_ingestion_path
.clone()
.unwrap_or(tempfile::tempdir().unwrap().into_path()),
config.remote_store_url.clone(),
vec![],
reader_options,
exit_receiver,
)
.await;
});

// Now start the task that will commit the indexed object changes to the store.
spawn_monitored_task!(ObjectsSnapshotProcessor::<S, T>::commit_objects_snapshot(
Expand All @@ -215,7 +167,7 @@ where
snapshot_config,
cancel,
));
Ok(())
Ok((worker, watermark))
}

impl<S, T> ObjectsSnapshotProcessor<S, T>
Expand Down
64 changes: 50 additions & 14 deletions crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;
use std::env;

use anyhow::Result;
Expand All @@ -10,10 +11,12 @@ use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::info;

use async_trait::async_trait;
use mysten_metrics::spawn_monitored_task;
use sui_data_ingestion_core::{
DataIngestionMetrics, IndexerExecutor, ReaderOptions, ShimProgressStore, WorkerPool,
DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, WorkerPool,
};
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

use crate::build_json_rpc_server;
use crate::errors::IndexerError;
Expand Down Expand Up @@ -70,7 +73,7 @@ impl Indexer {
env!("CARGO_PKG_VERSION")
);

let watermark = store
let primary_watermark = store
.get_latest_checkpoint_sequence_number()
.await
.expect("Failed to get latest tx checkpoint sequence number from DB")
Expand All @@ -96,15 +99,14 @@ impl Indexer {
};

// Start objects snapshot processor, which is a separate pipeline with its ingestion pipeline.
start_objects_snapshot_processor::<S, T>(
store.clone(),
metrics.clone(),
config.clone(),
snapshot_config,
extra_reader_options.clone(),
cancel.clone(),
)
.await?;
let (object_snapshot_worker, object_snapshot_watermark) =
start_objects_snapshot_processor::<S, T>(
store.clone(),
metrics.clone(),
snapshot_config,
cancel.clone(),
)
.await?;

let cancel_clone = cancel.clone();
let (exit_sender, exit_receiver) = oneshot::channel();
Expand All @@ -115,13 +117,24 @@ impl Indexer {
});

let mut executor = IndexerExecutor::new(
ShimProgressStore(watermark),
ShimIndexerProgressStore::new(vec![
("primary".to_string(), primary_watermark),
("object_snapshot".to_string(), object_snapshot_watermark),
]),
1,
DataIngestionMetrics::new(&Registry::new()),
);
let worker = new_handlers::<S, T>(store, metrics, watermark, cancel.clone()).await?;
let worker_pool = WorkerPool::new(worker, "workflow".to_string(), download_queue_size);
let worker =
new_handlers::<S, T>(store, metrics, primary_watermark, cancel.clone()).await?;
let worker_pool = WorkerPool::new(worker, "primary".to_string(), download_queue_size);

executor.register(worker_pool).await?;

let worker_pool = WorkerPool::new(
object_snapshot_worker,
"object_snapshot".to_string(),
download_queue_size,
);
executor.register(worker_pool).await?;
info!("Starting data ingestion executor...");
executor
Expand Down Expand Up @@ -159,3 +172,26 @@ impl Indexer {
Ok(())
}
}

struct ShimIndexerProgressStore {
watermarks: HashMap<String, CheckpointSequenceNumber>,
}

impl ShimIndexerProgressStore {
fn new(watermarks: Vec<(String, CheckpointSequenceNumber)>) -> Self {
Self {
watermarks: watermarks.into_iter().collect(),
}
}
}

#[async_trait]
impl ProgressStore for ShimIndexerProgressStore {
async fn load(&mut self, task_name: String) -> Result<CheckpointSequenceNumber> {
Ok(*self.watermarks.get(&task_name).expect("missing watermark"))
}

async fn save(&mut self, _: String, _: CheckpointSequenceNumber) -> Result<()> {
Ok(())
}
}

0 comments on commit da0c6d6

Please sign in to comment.