Skip to content

Commit

Permalink
Pick: indexer handler: limit unprocessed buffer size #19913 (#19917)
Browse files Browse the repository at this point in the history
## Description 

without this change, the unprocessed buffer will grow unboundedly until
OOM
it did not manifest on previous processor b/c it has sleep codes of 
```
_ = tokio::time::sleep(std::time::Duration::from_secs(config.sleep_duration))
```
and `sleep_duration` is 5 seconds.

## Test plan 

- correctness via the added objects_snapshot test
- oom via experiment result on benchmark env with mem usage
[link](https://app.datadoghq.com/metric/explorer?fromUser=false&start=1729280070529&end=1729283670529&paused=false#N4Ig7glgJg5gpgFxALlAGwIYE8D2BXJVEADxQEYAaELcqyKBAC1pEbghkcLIF8qo4AMwgA7CAgg4RKUAiwAHOChASAtnADOcAE4RNIKtrgBHPJoQaUAbVBGN8qVoD6gnNtUZCKiOq279VKY6epbINiAiGOrKQdpYZAYgUJ4YThr42gDGSsgg6gi6mZaBZnHKGABuMMiZUggYojoAdOqqblhNeBoY8MAA1ngARnBOkb7yGNnI2vKZALTDIpmMHtp9AAQU67Ui9Y3ao1FwyBp4EHOiAsQ6POuDWOvADlCH6jwgPAC6VK7ueJihcK-VT-DAxUrxD7fEAaORoHKgCbwhAIHJJHAwJyZAEaCCZRJoRpOOSKZTpQlQAlE+hMZQiNweNAffgQeyYLDEhRowkiJRfHh8GHyQkIADCUmEMBQIn+aB4QA)


---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
gegaowp authored Oct 18, 2024
1 parent 775f2cb commit 95dcb93
Showing 1 changed file with 18 additions and 9 deletions.
27 changes: 18 additions & 9 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub mod pruner;
pub mod tx_processor;

pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100;
pub(crate) const UNPROCESSED_CHECKPOINT_SIZE_LIMIT: usize = 1000;

#[derive(Debug)]
pub struct CheckpointDataToCommit {
Expand Down Expand Up @@ -116,17 +117,25 @@ impl<T> CommonHandler<T> {
}

// Try to fetch new data tuple from the stream
match stream.next().now_or_never() {
Some(Some(tuple_chunk)) => {
if cancel.is_cancelled() {
return Ok(());
}
for tuple in tuple_chunk {
unprocessed.insert(tuple.0.cp, tuple);
if unprocessed.len() >= UNPROCESSED_CHECKPOINT_SIZE_LIMIT {
tracing::info!(
"Unprocessed checkpoint size reached limit {}, skip reading from stream...",
UNPROCESSED_CHECKPOINT_SIZE_LIMIT
);
} else {
// Try to fetch new data tuple from the stream
match stream.next().now_or_never() {
Some(Some(tuple_chunk)) => {
if cancel.is_cancelled() {
return Ok(());
}
for tuple in tuple_chunk {
unprocessed.insert(tuple.0.cp, tuple);
}
}
Some(None) => break, // Stream has ended
None => {} // No new data tuple available right now
}
Some(None) => break, // Stream has ended
None => {} // No new data tuple available right now
}

// Process unprocessed checkpoints, even no new checkpoints from stream
Expand Down

0 comments on commit 95dcb93

Please sign in to comment.