diff --git a/crates/sui-indexer/src/handlers/mod.rs b/crates/sui-indexer/src/handlers/mod.rs index e9984cb1064b8..c36b3b4d13f4a 100644 --- a/crates/sui-indexer/src/handlers/mod.rs +++ b/crates/sui-indexer/src/handlers/mod.rs @@ -30,6 +30,7 @@ pub mod pruner; pub mod tx_processor; pub(crate) const CHECKPOINT_COMMIT_BATCH_SIZE: usize = 100; +pub(crate) const UNPROCESSED_CHECKPOINT_SIZE_LIMIT: usize = 1000; #[derive(Debug)] pub struct CheckpointDataToCommit { @@ -116,17 +117,25 @@ impl CommonHandler { } // Try to fetch new data tuple from the stream - match stream.next().now_or_never() { - Some(Some(tuple_chunk)) => { - if cancel.is_cancelled() { - return Ok(()); - } - for tuple in tuple_chunk { - unprocessed.insert(tuple.0.cp, tuple); + if unprocessed.len() >= UNPROCESSED_CHECKPOINT_SIZE_LIMIT { + tracing::info!( + "Unprocessed checkpoint size reached limit {}, skip reading from stream...", + UNPROCESSED_CHECKPOINT_SIZE_LIMIT + ); + } else { + // Try to fetch new data tuple from the stream + match stream.next().now_or_never() { + Some(Some(tuple_chunk)) => { + if cancel.is_cancelled() { + return Ok(()); + } + for tuple in tuple_chunk { + unprocessed.insert(tuple.0.cp, tuple); + } } + Some(None) => break, // Stream has ended + None => {} // No new data tuple available right now } - Some(None) => break, // Stream has ended - None => {} // No new data tuple available right now } // Process unprocessed checkpoints, even no new checkpoints from stream