From 03d1667cde26e433de27fea99c72da667067e080 Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Fri, 6 Sep 2024 08:42:53 -0400 Subject: [PATCH] [data ingestion] introduce upper limit for ingestion job (#19225) ## Description Introduces a new parameter: an upper limit for data ingestion. Once the framework reaches this checkpoint, it will gracefully shut down --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- crates/sui-data-ingestion-core/src/executor.rs | 6 ++++++ crates/sui-data-ingestion-core/src/reader.rs | 2 ++ 2 files changed, 8 insertions(+) diff --git a/crates/sui-data-ingestion-core/src/executor.rs b/crates/sui-data-ingestion-core/src/executor.rs index 92bfda006852a..0c1ef8d36ae99 100644 --- a/crates/sui-data-ingestion-core/src/executor.rs +++ b/crates/sui-data-ingestion-core/src/executor.rs @@ -67,6 +67,7 @@ impl IndexerExecutor

{ mut exit_receiver: oneshot::Receiver<()>, ) -> Result { let mut reader_checkpoint_number = self.progress_store.min_watermark()?; + let upper_limit = reader_options.upper_limit; let (checkpoint_reader, mut checkpoint_recv, gc_sender, _exit_sender) = CheckpointReader::initialize( path, @@ -93,6 +94,11 @@ impl IndexerExecutor

{ self.metrics.data_ingestion_checkpoint.with_label_values(&[&task_name]).set(sequence_number as i64); } Some(checkpoint) = checkpoint_recv.recv() => { + if let Some(limit) = upper_limit { + if checkpoint.checkpoint_summary.sequence_number > limit { + break; + } + } for sender in &self.pool_senders { sender.send(checkpoint.clone()).await?; } diff --git a/crates/sui-data-ingestion-core/src/reader.rs b/crates/sui-data-ingestion-core/src/reader.rs index f07e12272ee23..db25ed6d85fac 100644 --- a/crates/sui-data-ingestion-core/src/reader.rs +++ b/crates/sui-data-ingestion-core/src/reader.rs @@ -51,6 +51,7 @@ pub struct ReaderOptions { /// number of maximum concurrent requests to the remote store. Increase it for backfills pub batch_size: usize, pub data_limit: usize, + pub upper_limit: Option, } impl Default for ReaderOptions { @@ -60,6 +61,7 @@ impl Default for ReaderOptions { timeout_secs: 5, batch_size: 10, data_limit: 0, + upper_limit: None, } } }