Skip to content

Commit

Permalink
indexer-alt: committer handles empty batches, checkpoint stream
Browse files Browse the repository at this point in the history
## Description

This change handles two edge cases related to out-of-order commits that
were uncovered through the work on event indices. In both cases the
committer could get stuck, because none of the conditions guarding
select arms were met, but the scenario was different in each case:

- In the first case, the committer could get stuck because there were no
  more pending rows to write, but there were still pre-committed
  checkpoints to handle, and the logic to update watermarks based on the
  pre-committed checkpoints was guarded by a testing that `pending_rows
  > 0`.
- In the second case, the committer could get stuck because the pipeline
  shutdown before any checkpoints came through, meaning it had no work
  to do.

The fix was to allow the main `poll.tick()` arm to run if the receiving
channel was closed, or there were pending precommits left. A short
circuit was also added to move empty batches directly into the precommit
list because we can treat them as already written out.

## Test plan

Ran the following pipeline twice: The first time, it should exit without
writing any data (except the watermark), and the second time it should
just exit because there is no data between its watermark and the given
last checkpoint:

```
sui$ cargo run -p sui-indexer-alt --                                             \
  --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \
  --remote-store-url https://checkpoints.mainnet.sui.io                          \
  --last-checkpoint 1000 --pipeline ev_struct_inst
```
  • Loading branch information
amnn committed Oct 19, 2024
1 parent 05b6018 commit 31f4571
Showing 1 changed file with 30 additions and 19 deletions.
49 changes: 30 additions & 19 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ fn committer<H: Handler + 'static>(
}

// Time to write out another batch of rows, if there are any.
_ = poll.tick(), if pending_rows > 0 => {
_ = poll.tick(), if rx.is_closed() || pending_rows > 0 || precommitted.len() > 0 => {
let Ok(mut conn) = db.connect().await else {
warn!(pipeline = H::NAME, "Failed to get connection for DB");
cool.reset();
Expand Down Expand Up @@ -438,24 +438,28 @@ fn committer<H: Handler + 'static>(
// If we go down this route, we should consider factoring that work out into a
// separate task that also handles the watermark.

let affected = match H::commit(&batch_values, &mut conn).await {
Ok(affected) => affected,
let affected = if batch_values.is_empty() {
0
} else {
match H::commit(&batch_values, &mut conn).await {
Ok(affected) => affected,

Err(e) => {
let elapsed = guard.stop_and_record();
Err(e) => {
let elapsed = guard.stop_and_record();

error!(
pipeline = H::NAME,
elapsed_ms = elapsed * 1000.0,
attempt,
committed = batch_values.len(),
pending = pending_rows,
"Error writing batch: {e}",
);
error!(
pipeline = H::NAME,
elapsed_ms = elapsed * 1000.0,
attempt,
committed = batch_values.len(),
pending = pending_rows,
"Error writing batch: {e}",
);

cool.reset_after(RETRY_INTERVAL);
attempt += 1;
continue;
cool.reset_after(RETRY_INTERVAL);
attempt += 1;
continue;
}
}
};

Expand Down Expand Up @@ -632,14 +636,21 @@ fn committer<H: Handler + 'static>(
poll.reset_immediately();
}

Some(indexed )= rx.recv(), if pending_rows < H::MAX_PENDING_SIZE => {
Some(indexed) = rx.recv(), if pending_rows < H::MAX_PENDING_SIZE => {
metrics
.total_committer_rows_received
.with_label_values(&[H::NAME])
.inc_by(indexed.values.len() as u64);

pending_rows += indexed.values.len();
pending.insert(indexed.cp_sequence_number, indexed);
if indexed.values.is_empty() {
// If the handler sends an empty batch, short-circuit the commit, and add
// it directly to the pre-commit list.
let (watermark, _) = indexed.into_batch();
precommitted.insert(watermark);
} else {
pending_rows += indexed.values.len();
pending.insert(indexed.cp_sequence_number, indexed);
}
}
}
}
Expand Down

0 comments on commit 31f4571

Please sign in to comment.