Skip to content

Commit

Permalink
fix(indexer-alt): Flush pending rows before shutdown
Browse files Browse the repository at this point in the history
## Description

When running the indexer on a finite range of checkpoints. Make sure
commiters' buffers are completely empty before shutting down their task,
otherwise we may not fully write out all the intended rows for the range
of checkpoints provided (there may be some data left at the bottom of
the barrel).

## Test plan

Ran the following:

```
cargo run -p sui-indexer-alt --release -- \
  --database-url "postgres://postgres:postgrespw@localhost:5432/sui_indexer_alt" \
  --remote-store-url https://checkpoints.mainnet.sui.io \
  --last-checkpoint 2000
```

Corroborated that the data that results in the DB at the end:

- Stops at the expected checkpoint (not before or after)
- Matches counts of rows in the production mainnet DB for the equivalent
  tables at the same checkpoints.

This can/should be made into an automated test, but that requires tempdb
and migrations to be implemented (a comment has been added to this
effect).
  • Loading branch information
amnn committed Oct 18, 2024
1 parent f847ad9 commit eb821d3
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions crates/sui-indexer-alt/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ fn committer<H: Handler + 'static>(
}
}

// TODO (amnn): Test this behaviour (requires tempdb and migrations).
if pending_rows == 0 && rx.is_closed() {
info!(pipeline = H::NAME, "Handler closed channel, pending rows empty, stopping committer");
break;
}

cool.reset();
}

Expand All @@ -615,19 +621,14 @@ fn committer<H: Handler + 'static>(
poll.reset_immediately();
}

indexed = rx.recv(), if pending_rows < H::MAX_PENDING_SIZE => {
if let Some(indexed) = indexed {
metrics
.total_committer_rows_received
.with_label_values(&[H::NAME])
.inc_by(indexed.values.len() as u64);
Some(indexed )= rx.recv(), if pending_rows < H::MAX_PENDING_SIZE => {
metrics
.total_committer_rows_received
.with_label_values(&[H::NAME])
.inc_by(indexed.values.len() as u64);

pending_rows += indexed.values.len();
pending.insert(indexed.cp_sequence_number, indexed);
} else {
info!(pipeline = H::NAME, "Handler closed channel, stopping committer");
break;
}
pending_rows += indexed.values.len();
pending.insert(indexed.cp_sequence_number, indexed);
}
}
}
Expand Down

0 comments on commit eb821d3

Please sign in to comment.