Skip to content

Commit

Permalink
Refactor TrieLogPruner preload timeout to be more testable (#7393)
Browse files Browse the repository at this point in the history
Also update logging

Signed-off-by: Simon Dudley <[email protected]>
  • Loading branch information
siladu authored Jul 30, 2024
1 parent 4ace9e4 commit ec8429f
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.TreeMultimap;
Expand Down Expand Up @@ -91,53 +92,54 @@ public TrieLogPruner(
}

public void initialize() {
preloadQueueWithTimeout();
preloadQueueWithTimeout(PRELOAD_TIMEOUT_IN_SECONDS);
}

private void preloadQueueWithTimeout() {
@VisibleForTesting
void preloadQueueWithTimeout(final int timeoutInSeconds) {

LOG.info("Trie log pruner queue preload starting...");
LOG.atInfo()
.setMessage("Attempting to load first {} trie logs from database...")
.addArgument(loadingLimit)
.log();

try (final ScheduledExecutorService preloadExecutor = Executors.newScheduledThreadPool(1)) {
try (final ExecutorService preloadExecutor = Executors.newSingleThreadExecutor()) {
final Future<?> future = preloadExecutor.submit(this::preloadQueue);

final AtomicBoolean timeoutOccurred = new AtomicBoolean(false);
final Runnable timeoutTask =
() -> {
timeoutOccurred.set(true);
LOG.atWarn()
.setMessage(
"Timeout occurred while loading and processing {} trie logs from database")
.addArgument(loadingLimit)
.log();
};

final ScheduledFuture<?> timeoutFuture =
preloadExecutor.schedule(timeoutTask, PRELOAD_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
LOG.atInfo()
.setMessage(
"Trie log pruning will timeout after {} seconds. If this is timing out, consider using `besu storage trie-log prune` subcommand, see https://besu.hyperledger.org/public-networks/how-to/bonsai-limit-trie-logs")
.addArgument(PRELOAD_TIMEOUT_IN_SECONDS)
.addArgument(timeoutInSeconds)
.log();

preloadQueue(timeoutOccurred, timeoutFuture);
try {
future.get(timeoutInSeconds, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Error loading trie logs from database", e);
future.cancel(true);
} catch (TimeoutException e) {
future.cancel(true);
LOG.atWarn()
.setMessage("Timeout occurred while loading and processing {} trie logs from database")
.addArgument(loadingLimit)
.log();
}
}
LOG.info("Trie log pruner queue preload complete.");
}

private void preloadQueue(
final AtomicBoolean timeoutOccurred, final ScheduledFuture<?> timeoutFuture) {
private void preloadQueue() {

try (final Stream<byte[]> trieLogKeys = rootWorldStateStorage.streamTrieLogKeys(loadingLimit)) {

final AtomicLong addToPruneQueueCount = new AtomicLong();
final AtomicLong orphansPruned = new AtomicLong();
trieLogKeys.forEach(
blockHashAsBytes -> {
if (timeoutOccurred.get()) {
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException(
new TimeoutException("Timeout occurred while preloading trie log prune queue"));
new InterruptedException("Thread interrupted during trie log processing."));
}
final Hash blockHash = Hash.wrap(Bytes32.wrap(blockHashAsBytes));
final Optional<BlockHeader> header = blockchain.getBlockHeader(blockHash);
Expand All @@ -152,17 +154,19 @@ private void preloadQueue(
}
});

timeoutFuture.cancel(true);
LOG.atDebug().log("Pruned {} orphaned trie logs from database...", orphansPruned.intValue());
LOG.atInfo().log(
"Added {} trie logs to prune queue. Commencing pruning of eligible trie logs...",
addToPruneQueueCount.intValue());
int prunedCount = pruneFromQueue();
LOG.atInfo().log("Pruned {} trie logs.", prunedCount);
LOG.atInfo().log("Pruned {} trie logs", prunedCount);
} catch (Exception e) {
if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
if (e instanceof InterruptedException
|| (e.getCause() != null && e.getCause() instanceof InterruptedException)) {
LOG.info("Operation interrupted, but will attempt to prune what's in the queue so far...");
int prunedCount = pruneFromQueue();
LOG.atInfo().log("Operation timed out, but still pruned {} trie logs.", prunedCount);
LOG.atInfo().log("...pruned {} trie logs", prunedCount);
Thread.currentThread().interrupt(); // Preserve interrupt status
} else {
LOG.error("Error loading trie logs from database, nothing pruned", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.trie.diffbased.bonsai.trielog;
package org.hyperledger.besu.ethereum.trie.diffbased.common.trielog;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -26,9 +26,6 @@
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogAddedEvent;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogLayer;
import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogPruner;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;

import java.util.Optional;
Expand All @@ -43,6 +40,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.AnswersWithDelay;

public class TrieLogPrunerTest {

Expand Down Expand Up @@ -82,6 +80,53 @@ public void initialize_preloads_queue_and_prunes_orphaned_blocks() {
verify(worldState, times(1)).pruneTrieLog(header2.getBlockHash());
}

@Test
public void preloadQueueWithTimeout_handles_timeout_during_streamTrieLogKeys() {
// Given
final int timeoutInSeconds = 1;
final long timeoutInMillis = timeoutInSeconds * 1000;
final int loadingLimit = 2;
TrieLogPruner trieLogPruner =
new TrieLogPruner(
worldState, blockchain, executeAsync, 3, loadingLimit, false, new NoOpMetricsSystem());

// Simulate a long-running operation
when(worldState.streamTrieLogKeys(loadingLimit))
.thenAnswer(new AnswersWithDelay(timeoutInMillis * 2, invocation -> Stream.empty()));

// When
long startTime = System.currentTimeMillis();
trieLogPruner.preloadQueueWithTimeout(timeoutInSeconds);
long elapsedTime = System.currentTimeMillis() - startTime;

// Then
assertThat(elapsedTime).isLessThan(timeoutInMillis * 2);
}

@Test
public void preloadQueueWithTimeout_handles_timeout_during_getBlockHeader() {
// Given
final int timeoutInSeconds = 1;
final long timeoutInMillis = timeoutInSeconds * 1000;
TrieLogPruner trieLogPruner = setupPrunerAndFinalizedBlock(3, 1);

// Simulate a long-running operation
when(blockchain.getBlockHeader(any(Hash.class)))
// delay on first invocation, then return empty
.thenAnswer(new AnswersWithDelay(timeoutInMillis * 2, invocation -> Optional.empty()))
.thenReturn(Optional.empty());

// When
long startTime = System.currentTimeMillis();
trieLogPruner.preloadQueueWithTimeout(timeoutInSeconds);
long elapsedTime = System.currentTimeMillis() - startTime;

// Then
assertThat(elapsedTime).isLessThan(timeoutInMillis * 2);
verify(worldState, times(1)).pruneTrieLog(key(1));
verify(worldState, times(1)).pruneTrieLog(key(2));
}

@Test
public void trieLogs_pruned_in_reverse_order_within_pruning_window() {
// Given
Expand Down

0 comments on commit ec8429f

Please sign in to comment.