Skip to content

Commit

Permalink
Delete consensus data from tables when it is no longer needed (#19697)
Browse files Browse the repository at this point in the history
This PR is in preparation for data quarantining. The DQ PR will move the
tables affected by this PR entirely into memory, and crash recovery will
be driven by re-processing consensus commits.

However, when we deploy DQ to a validator for the first time, we will
restart in a state where uncertified consensus commits have already been
marked as processed, so they will not be re-processed on startup.

This means that upon starting up for the first time, some data will be
on disk instead of in memory. There are two possible solutions to this:

- All reads fall back to the database. This is a lot of ugly code, and
is slow.
- All data in the database is read into memory at startup. This is fast
and simple.

This PR bounds the amount of data we will have to read at startup in
order to make the second option feasible.
  • Loading branch information
mystenmark authored Oct 4, 2024
1 parent 3b665eb commit fedfb0d
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,17 @@ impl AuthorityPerEpochStore {
// Now that the transaction effects are committed, we will never re-execute, so we
// don't need to worry about equivocating.
batch.delete_batch(&tables.signed_effects_digests, digests)?;

// Note that this does not delete keys for random transactions. The worst case result
// of this is that we restart at the end of the epoch and load about 160k keys into
// memory.
batch.delete_batch(
&tables.assigned_shared_object_versions_v2,
digests.iter().map(|d| TransactionKey::Digest(*d)),
)?;

batch.delete_batch(&tables.user_signatures_for_checkpoints, digests)?;

batch.write()?;
Ok(())
}
Expand Down Expand Up @@ -3750,10 +3761,11 @@ impl AuthorityPerEpochStore {
commit_height: CheckpointHeight,
content_info: Vec<(CheckpointSummary, CheckpointContents)>,
) -> SuiResult<()> {
let tables = self.tables()?;
// All created checkpoints are inserted in builder_checkpoint_summary in a single batch.
// This means that upon restart we can use BuilderCheckpointSummary::commit_height
// from the last built summary to resume building checkpoints.
let mut batch = self.tables()?.pending_checkpoints_v2.batch();
let mut batch = tables.pending_checkpoints_v2.batch();
for (position_in_commit, (summary, transactions)) in content_info.into_iter().enumerate() {
let sequence_number = summary.sequence_number;
let summary = BuilderCheckpointSummary {
Expand All @@ -3762,17 +3774,27 @@ impl AuthorityPerEpochStore {
position_in_commit,
};
batch.insert_batch(
&self.tables()?.builder_checkpoint_summary_v2,
&tables.builder_checkpoint_summary_v2,
[(&sequence_number, summary)],
)?;
batch.insert_batch(
&self.tables()?.builder_digest_to_checkpoint,
&tables.builder_digest_to_checkpoint,
transactions
.iter()
.map(|tx| (tx.transaction, sequence_number)),
)?;
}

// find all pending checkpoints <= commit_height and remove them
let iter = tables
.pending_checkpoints_v2
.safe_range_iter(0..=commit_height);
let keys = iter
.map(|c| c.map(|(h, _)| h))
.collect::<Result<Vec<_>, _>>()?;

batch.delete_batch(&tables.pending_checkpoints_v2, &keys)?;

Ok(batch.write()?)
}

Expand Down

0 comments on commit fedfb0d

Please sign in to comment.