Skip to content

Commit

Permalink
feat: Add support for consensus time period in blocks (#17536)
Browse files Browse the repository at this point in the history
Signed-off-by: Derek Riley <[email protected]>
  • Loading branch information
derektriley authored Feb 12, 2025
1 parent 222f269 commit 570df2e
Show file tree
Hide file tree
Showing 19 changed files with 664 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -367,7 +367,7 @@ public final class Hedera
/**
* The action to take, if any, when a consensus round is sealed.
*/
private final BiConsumer<Round, State> onSealConsensusRound;
private final BiPredicate<Round, State> onSealConsensusRound;
/**
* Once set, a future that resolves to the hash of the state used to initialize the application. This is known
* immediately at genesis or on restart from a saved state; during reconnect, it is known when reconnect
Expand Down Expand Up @@ -538,7 +538,7 @@ public Hedera(
() -> new PlatformMerkleStateRoot(ServicesSoftwareVersion::new);
final var blockStreamsEnabled = isBlockStreamEnabled();
stateRootSupplier = blockStreamsEnabled ? () -> withListeners(baseSupplier.get()) : baseSupplier;
onSealConsensusRound = blockStreamsEnabled ? this::manageBlockEndRound : (round, state) -> {};
onSealConsensusRound = blockStreamsEnabled ? this::manageBlockEndRound : (round, state) -> true;
// And the factory for the MerkleStateRoot class id must be our constructor
constructableRegistry.registerConstructable(
new ClassConstructorPair(PlatformMerkleStateRoot.class, stateRootSupplier));
Expand Down Expand Up @@ -970,9 +970,7 @@ public void onHandleConsensusRound(
public boolean onSealConsensusRound(@NonNull final Round round, @NonNull final State state) {
requireNonNull(state);
requireNonNull(round);
onSealConsensusRound.accept(round, state);
// This logic to be completed in https://github.com/hashgraph/hedera-services/issues/17469
return true;
return onSealConsensusRound.test(round, state);
}

/*==================================================================================================================
Expand Down Expand Up @@ -1287,8 +1285,8 @@ private PlatformMerkleStateRoot withListeners(@NonNull final PlatformMerkleState
return root;
}

private void manageBlockEndRound(@NonNull final Round round, @NonNull final State state) {
daggerApp.blockStreamManager().endRound(state, round.getRoundNum());
private boolean manageBlockEndRound(@NonNull final Round round, @NonNull final State state) {
return daggerApp.blockStreamManager().endRound(state, round.getRoundNum());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
* Copyright (C) 2024-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ enum PendingWork {
*
* @param state the mutable state of the network at the end of the round
* @param roundNum the number of the round that has just ended
* @return returns true if the round is the last round in the block
*/
void endRound(@NonNull State state, final long roundNum);
boolean endRound(@NonNull State state, long roundNum);

/**
* Writes a block item to the stream.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
* Copyright (C) 2024-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.security.MessageDigest;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.EnumSet;
Expand All @@ -94,6 +95,7 @@ public class BlockStreamManagerImpl implements BlockStreamManager {
private static final Logger log = LogManager.getLogger(BlockStreamManagerImpl.class);

private final int roundsPerBlock;
private final Duration blockPeriod;
private final BlockStreamWriterMode streamWriterType;
private final int hashCombineBatchSize;
private final BlockHashSigner blockHashSigner;
Expand Down Expand Up @@ -123,6 +125,7 @@ public class BlockStreamManagerImpl implements BlockStreamManager {
private long lastNonEmptyRoundNumber;
private Bytes lastBlockHash;
private Instant blockTimestamp;
private Instant consensusTimeLastRound;
private BlockItemWriter writer;
private StreamingTreeHasher inputTreeHasher;
private StreamingTreeHasher outputTreeHasher;
Expand Down Expand Up @@ -191,6 +194,7 @@ public BlockStreamManagerImpl(
this.hapiVersion = hapiVersionFrom(config);
final var blockStreamConfig = config.getConfigData(BlockStreamConfig.class);
this.roundsPerBlock = blockStreamConfig.roundsPerBlock();
this.blockPeriod = blockStreamConfig.blockPeriod();
this.streamWriterType = blockStreamConfig.writerMode();
this.hashCombineBatchSize = blockStreamConfig.hashCombineBatchSize();
final var networkAdminConfig = config.getConfigData(NetworkAdminConfig.class);
Expand Down Expand Up @@ -234,6 +238,8 @@ public void startRound(@NonNull final Round round, @NonNull final State state) {
// Track freeze round numbers because they always end a block
freezeRoundNumber = round.getRoundNum();
}

// Writer will be null when beginning a new block
if (writer == null) {
writer = writerSupplier.get();
// This iterator is never empty; c.f. DefaultTransactionHandler#handleConsensusRound()
Expand Down Expand Up @@ -268,6 +274,7 @@ public void startRound(@NonNull final Round round, @NonNull final State state) {
worker.addItem(BlockItem.newBuilder().blockHeader(header).build());
}
}
consensusTimeLastRound = round.getConsensusTimestamp();
}

@Override
Expand Down Expand Up @@ -312,7 +319,7 @@ public void setLastHandleTime(@NonNull final Instant lastHandleTime) {
}

@Override
public void endRound(@NonNull final State state, final long roundNum) {
public boolean endRound(@NonNull final State state, final long roundNum) {
if (shouldCloseBlock(roundNum, roundsPerBlock)) {
// If there were no user transactions in the block, this writes all the accumulated
// items starting from the header, sacrificing the benefits of concurrency; but
Expand Down Expand Up @@ -394,7 +401,9 @@ public void endRound(@NonNull final State state, final long roundNum) {
DiskStartupNetworks.writeNetworkInfo(
state, exportPath, EnumSet.allOf(InfoType.class), platformStateFacade);
}
return true;
}
return false;
}

@Override
Expand Down Expand Up @@ -525,10 +534,24 @@ private static boolean impliesPostUpgradeWorkPending(
}

private boolean shouldCloseBlock(final long roundNumber, final int roundsPerBlock) {
// We need the signer to be ready
if (!blockHashSigner.isReady()) {
return false;
}
return roundNumber % roundsPerBlock == 0 || roundNumber == freezeRoundNumber;

// During freeze round, we should close the block regardless of other conditions
if (roundNumber == freezeRoundNumber) {
return true;
}

// If blockPeriod is 0, use roundsPerBlock
if (blockPeriod.isZero()) {
return roundNumber % roundsPerBlock == 0;
}

// For time-based blocks, check if enough consensus time has elapsed
final var elapsed = Duration.between(blockTimestamp, consensusTimeLastRound);
return elapsed.compareTo(blockPeriod) >= 0;
}

private boolean isFreezeRound(@NonNull final PlatformState platformState, @NonNull final Round round) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ public void closeBlock() {
try {
writableStreamingData.close();
state = State.CLOSED;
// Write a .mf file to indicate that the block file is complete.
final Path markerFile = getBlockFilePath(blockNumber).resolveSibling(longToFileName(blockNumber) + ".mf");
if (Files.exists(markerFile)) {
logger.info("Skipping block marker file for {} as it already exists", markerFile);
} else {
Files.createFile(markerFile);
}
} catch (final IOException e) {
logger.error("Error closing the FileBlockItemWriter output stream", e);
throw new UncheckedIOException(e);
Expand All @@ -209,7 +216,7 @@ private Path getBlockFilePath(final long blockNumber) {
* @return the 36-character string padded with leading zeros
*/
@NonNull
private static String longToFileName(final long value) {
public static String longToFileName(final long value) {
// Convert the signed long to an unsigned long using BigInteger for correct representation
BigInteger unsignedValue =
BigInteger.valueOf(value & Long.MAX_VALUE).add(BigInteger.valueOf(Long.MIN_VALUE & value));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
* Copyright (C) 2024-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Loading

0 comments on commit 570df2e

Please sign in to comment.