Skip to content

Commit

Permalink
[data ingestion] introduce upper limit for ingestion job (#19225)
Browse files Browse the repository at this point in the history
## Description 

Introduces a new parameter: an upper limit for data ingestion. Once the
framework reaches this checkpoint, it will gracefully shut down

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
phoenix-o authored Sep 6, 2024
1 parent 072ebd9 commit 03d1667
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
6 changes: 6 additions & 0 deletions crates/sui-data-ingestion-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
mut exit_receiver: oneshot::Receiver<()>,
) -> Result<ExecutorProgress> {
let mut reader_checkpoint_number = self.progress_store.min_watermark()?;
let upper_limit = reader_options.upper_limit;
let (checkpoint_reader, mut checkpoint_recv, gc_sender, _exit_sender) =
CheckpointReader::initialize(
path,
Expand All @@ -93,6 +94,11 @@ impl<P: ProgressStore> IndexerExecutor<P> {
self.metrics.data_ingestion_checkpoint.with_label_values(&[&task_name]).set(sequence_number as i64);
}
Some(checkpoint) = checkpoint_recv.recv() => {
if let Some(limit) = upper_limit {
if checkpoint.checkpoint_summary.sequence_number > limit {
break;
}
}
for sender in &self.pool_senders {
sender.send(checkpoint.clone()).await?;
}
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-data-ingestion-core/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct ReaderOptions {
/// number of maximum concurrent requests to the remote store. Increase it for backfills
pub batch_size: usize,
pub data_limit: usize,
pub upper_limit: Option<CheckpointSequenceNumber>,
}

impl Default for ReaderOptions {
Expand All @@ -60,6 +61,7 @@ impl Default for ReaderOptions {
timeout_secs: 5,
batch_size: 10,
data_limit: 0,
upper_limit: None,
}
}
}
Expand Down

0 comments on commit 03d1667

Please sign in to comment.