Skip to content

Commit

Permalink
Check for blocks to freeze state for on startup. Store the most recen…
Browse files Browse the repository at this point in the history
…t block state has been frozen for

Signed-off-by: Matthew Whitehead <[email protected]>
  • Loading branch information
matthew1001 committed Sep 5, 2024
1 parent 436c66a commit e914f22
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,6 @@ private BonsaiArchiveFreezer createBonsaiArchiveFreezer(
(BonsaiWorldStateKeyValueStorage) worldStateStorage,
blockchain,
scheduler::executeServiceTask,
10,
trieLogManager);
archiveFreezer.initialize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.trie.diffbased.common.storage.DiffBasedWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogManager;
import org.hyperledger.besu.plugin.services.trielogs.TrieLog;
Expand Down Expand Up @@ -48,7 +49,8 @@ public class BonsaiArchiveFreezer implements BlockAddedObserver {
private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage;
private final Blockchain blockchain;
private final Consumer<Runnable> executeAsync;
private final long numberOfBlocksToKeepInWarmStorage;
private static final int PRELOAD_LIMIT = 1000;
private static final int DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE = 10;
private final TrieLogManager trieLogManager;

private final Multimap<Long, Hash> blocksToMoveToFreezer =
Expand All @@ -58,24 +60,56 @@ public BonsaiArchiveFreezer(
final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage,
final Blockchain blockchain,
final Consumer<Runnable> executeAsync,
final long numberOfBlocksToKeepInWarmStorage,
final TrieLogManager trieLogManager) {
this.rootWorldStateStorage = rootWorldStateStorage;
this.blockchain = blockchain;
this.executeAsync = executeAsync;
this.numberOfBlocksToKeepInWarmStorage = numberOfBlocksToKeepInWarmStorage;
this.trieLogManager = trieLogManager;
}

public int initialize() {
// TODO Probably need to freeze old blocks that haven't been frozen already?
return 0;
public void initialize() {
// On startup there will be recent blocks whose state and storage hasn't been archived yet.
// Pre-load them ready for freezing state once enough new blocks have been added to the chain.
Optional<Long> frozenBlocksHead = Optional.empty();

Optional<Long> latestFrozenBlock = rootWorldStateStorage.getLatestArchiveFrozenBlock();

if (latestFrozenBlock.isPresent()) {
frozenBlocksHead = latestFrozenBlock;
} else {
// Start from genesis block
if (blockchain.getBlockHashByNumber(0).isPresent()) {
frozenBlocksHead = Optional.of(0L);
}
}

if (frozenBlocksHead.isPresent()) {
int preLoadedBlocks = 0;
Optional<Block> nextBlock = blockchain.getBlockByNumber(frozenBlocksHead.get());
for (int i = 0; i < PRELOAD_LIMIT; i++) {
if (nextBlock.isPresent()) {
addToFreezerQueue(
nextBlock.get().getHeader().getNumber(), nextBlock.get().getHeader().getHash());
preLoadedBlocks++;
nextBlock = blockchain.getBlockByNumber(nextBlock.get().getHeader().getNumber() + 1);
} else {
break;
}
}
LOG.atInfo()
.setMessage("Preloaded {} blocks to move their state and storage to the archive freezer")
.addArgument(preLoadedBlocks)
.log();
}

// Start processing any backlog on startup - don't wait for a new block to be imported.
moveBlockStateToFreezer();
}

public synchronized void addToFreezerQueue(final long blockNumber, final Hash blockHash) {
LOG.atDebug()
.setMessage(
"adding block to archive freezer queue for moving to cold storage, blockNumber {}; blockHash {}")
"Adding block to archive freezer queue for moving to cold storage, blockNumber {}; blockHash {}")
.addArgument(blockNumber)
.addArgument(blockHash)
.log();
Expand All @@ -84,26 +118,26 @@ public synchronized void addToFreezerQueue(final long blockNumber, final Hash bl

public synchronized int moveBlockStateToFreezer() {
final long retainAboveThisBlock =
blockchain.getChainHeadBlockNumber() - numberOfBlocksToKeepInWarmStorage;
blockchain.getChainHeadBlockNumber() - DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE;

if (rootWorldStateStorage.getFlatDbMode().getVersion() == Bytes.EMPTY) {
throw new IllegalStateException("DB mode version not set");
}

AtomicInteger frozenAccountStateCount = new AtomicInteger();
AtomicInteger frozenAccountStorageCount = new AtomicInteger();

LOG.atDebug()
LOG.atTrace()
.setMessage(
"Moving cold state to freezer storage (chainHeadNumber: {} - numberOfBlocksToKeepInWarmStorage: {}) = {}")
.addArgument(blockchain::getChainHeadBlockNumber)
.addArgument(numberOfBlocksToKeepInWarmStorage)
.addArgument(DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE)
.addArgument(retainAboveThisBlock)
.log();

final var accountsToMove =
blocksToMoveToFreezer.asMap().entrySet().stream()
.dropWhile((e) -> e.getKey() > retainAboveThisBlock);
// TODO - limit to a configurable number of blocks to move per loop

final Multimap<Long, Hash> accountStateFreezerActionsComplete = ArrayListMultimap.create();
final Multimap<Long, Hash> accountStorageFreezerActionsComplete = ArrayListMultimap.create();
Expand Down Expand Up @@ -173,26 +207,28 @@ public synchronized int moveBlockStateToFreezer() {
// For us to consider all state and storage changes for a block complete, it must have been
// recorded in both accountState and accountStorage lists. If only one finished we need to try
// freezing state/storage for that block again on the next loop
int frozenBlocksCompleted = blocksToMoveToFreezer.size();
AtomicInteger frozenBlocksCompleted = new AtomicInteger();
accountStateFreezerActionsComplete
.keySet()
.forEach(
(b) -> {
if (accountStorageFreezerActionsComplete.containsKey(b)) {
frozenBlocksCompleted.getAndIncrement();
rootWorldStateStorage.setLatestArchiveFrozenBlock(b);
blocksToMoveToFreezer.removeAll(b);
}
});

if (frozenAccountStateCount.get() > 0 || frozenAccountStorageCount.get() > 0) {
LOG.atInfo()
.setMessage("froze {} account state entries, {} account storage entries for {} blocks")
LOG.atDebug()
.setMessage("Froze {} account state entries, {} account storage entries for {} blocks")
.addArgument(frozenAccountStateCount.get())
.addArgument(frozenAccountStorageCount.get())
.addArgument(frozenBlocksCompleted)
.addArgument(frozenBlocksCompleted.get())
.log();
}

return frozenBlocksCompleted;
return frozenBlocksCompleted.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public abstract class DiffBasedWorldStateKeyValueStorage
public static final byte[] WORLD_BLOCK_HASH_KEY =
"worldBlockHash".getBytes(StandardCharsets.UTF_8);

// 0x61726368697665426C6F636B7346726F7A656E
public static final byte[] ARCHIVE_BLOCKS_FROZEN =
"archiveBlocksFrozen".getBytes(StandardCharsets.UTF_8);

private final AtomicBoolean shouldClose = new AtomicBoolean(false);

protected final AtomicBoolean isClosed = new AtomicBoolean(false);
Expand Down Expand Up @@ -315,6 +319,22 @@ public int freezePreviousStorageState(
return frozenStateCount.get();
}

public Optional<Long> getLatestArchiveFrozenBlock() {
return composedWorldStateStorage
.get(ACCOUNT_INFO_STATE_FREEZER, ARCHIVE_BLOCKS_FROZEN)
.map(Bytes::wrap)
.map(Bytes::toLong);
}

public void setLatestArchiveFrozenBlock(final Long blockNumber) {
SegmentedKeyValueStorageTransaction tx = composedWorldStateStorage.startTransaction();
tx.put(
ACCOUNT_INFO_STATE_FREEZER,
ARCHIVE_BLOCKS_FROZEN,
Bytes.ofUnsignedLong(blockNumber).toArrayUnsafe());
tx.commit();
}

@Override
public synchronized void close() throws Exception {
// when the storage clears, close
Expand Down

0 comments on commit e914f22

Please sign in to comment.