From 007b52914a303a988d312a906dcc3b32e3b61c1f Mon Sep 17 00:00:00 2001 From: Derek Riley Date: Fri, 21 Feb 2025 08:49:50 -0500 Subject: [PATCH] feat: Add support for consensus time period in blocks (#17536) (#17920) Signed-off-by: Derek Riley --- .../main/java/com/hedera/node/app/Hedera.java | 14 +- .../node/app/blocks/BlockStreamManager.java | 3 +- .../blocks/impl/BlockStreamManagerImpl.java | 27 +- .../app/blocks/impl/FileBlockItemWriter.java | 9 +- .../impl/BlockStreamManagerImplTest.java | 253 ++++++++++++++++-- .../blocks/impl/FileBlockItemWriterTest.java | 54 +++- .../node/config/data/BlockStreamConfig.java | 2 + .../bdd/junit/support/BlockStreamAccess.java | 32 ++- .../block/BlockContentsValidator.java | 125 +++++++-- .../services/bdd/spec/utilops/UtilVerbs.java | 9 + .../pauses/HapiSpecWaitUntilNextBlock.java | 130 +++++++++ .../suites/contract/records/RecordsSuite.java | 34 +-- .../suites/hip993/SystemFileExportsTest.java | 42 ++- .../ConcurrentIntegrationTests.java | 9 +- .../queries/AsNodeOperatorQueriesTest.java | 2 +- .../staking/RepeatableStakingTests.java | 5 +- 16 files changed, 634 insertions(+), 116 deletions(-) create mode 100644 hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/pauses/HapiSpecWaitUntilNextBlock.java diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/Hedera.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/Hedera.java index 740aa5cf6a12..569ed99d07f5 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/Hedera.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/Hedera.java @@ -160,7 +160,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.function.BiConsumer; +import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; @@ -350,7 +350,7 @@ public final class Hedera implements SwirldMain, PlatformStatus /** * The action to take, if any, when a consensus round is sealed. */ - private final BiConsumer onSealConsensusRound; + private final BiPredicate onSealConsensusRound; /** * Once set, a future that resolves to the hash of the state used to initialize the application. This is known * immediately at genesis or on restart from a saved state; during reconnect, it is known when reconnect @@ -521,7 +521,7 @@ public Hedera( final Supplier baseSupplier = HederaStateRoot::new; final var blockStreamsEnabled = isBlockStreamEnabled(); stateRootSupplier = blockStreamsEnabled ? () -> withListeners(baseSupplier.get()) : baseSupplier; - onSealConsensusRound = blockStreamsEnabled ? this::manageBlockEndRound : (round, state) -> {}; + onSealConsensusRound = blockStreamsEnabled ? this::manageBlockEndRound : (round, state) -> true; // And the factory for the MerkleStateRoot class id must be our constructor constructableRegistry.registerConstructable( new ClassConstructorPair(HederaStateRoot.class, stateRootSupplier::get)); @@ -953,9 +953,7 @@ public void onHandleConsensusRound( public boolean onSealConsensusRound(@NonNull final Round round, @NonNull final State state) { requireNonNull(state); requireNonNull(round); - onSealConsensusRound.accept(round, state); - // This logic to be completed in https://github.com/hashgraph/hedera-services/issues/17469 - return true; + return onSealConsensusRound.test(round, state); } /*================================================================================================================== @@ -1271,8 +1269,8 @@ private MerkleNodeState withListeners(@NonNull final MerkleNodeState root) { return root; } - private void manageBlockEndRound(@NonNull final Round round, @NonNull final State state) { - daggerApp.blockStreamManager().endRound(state, round.getRoundNum()); + private boolean manageBlockEndRound(@NonNull final Round round, @NonNull final State state) { + return daggerApp.blockStreamManager().endRound(state, round.getRoundNum()); } /** diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/BlockStreamManager.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/BlockStreamManager.java index bfc4c101e50f..a471fdf68232 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/BlockStreamManager.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/BlockStreamManager.java @@ -115,8 +115,9 @@ enum PendingWork { * * @param state the mutable state of the network at the end of the round * @param roundNum the number of the round that has just ended + * @return returns true if the round is the last round in the block */ - void endRound(@NonNull State state, final long roundNum); + boolean endRound(@NonNull State state, long roundNum); /** * Writes a block item to the stream. diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/BlockStreamManagerImpl.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/BlockStreamManagerImpl.java index d44ec171f06e..3bb828eea4fa 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/BlockStreamManagerImpl.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/BlockStreamManagerImpl.java @@ -53,6 +53,7 @@ import java.nio.ByteBuffer; import java.nio.file.Paths; import java.security.MessageDigest; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.EnumSet; @@ -75,6 +76,7 @@ public class BlockStreamManagerImpl implements BlockStreamManager { private static final Logger log = LogManager.getLogger(BlockStreamManagerImpl.class); private final int roundsPerBlock; + private final Duration blockPeriod; private final BlockStreamWriterMode streamWriterType; private final int hashCombineBatchSize; private final BlockHashSigner blockHashSigner; @@ -104,6 +106,7 @@ public class BlockStreamManagerImpl implements BlockStreamManager { private long lastNonEmptyRoundNumber; private Bytes lastBlockHash; private Instant blockTimestamp; + private Instant consensusTimeLastRound; private BlockItemWriter writer; private StreamingTreeHasher inputTreeHasher; private StreamingTreeHasher outputTreeHasher; @@ -172,6 +175,7 @@ public BlockStreamManagerImpl( this.hapiVersion = hapiVersionFrom(config); final var blockStreamConfig = config.getConfigData(BlockStreamConfig.class); this.roundsPerBlock = blockStreamConfig.roundsPerBlock(); + this.blockPeriod = blockStreamConfig.blockPeriod(); this.streamWriterType = blockStreamConfig.writerMode(); this.hashCombineBatchSize = blockStreamConfig.hashCombineBatchSize(); final var networkAdminConfig = config.getConfigData(NetworkAdminConfig.class); @@ -211,6 +215,8 @@ public void startRound(@NonNull final Round round, @NonNull final State state) { // Track freeze round numbers because they always end a block freezeRoundNumber = round.getRoundNum(); } + + // Writer will be null when beginning a new block if (writer == null) { writer = writerSupplier.get(); // This iterator is never empty; c.f. DefaultTransactionHandler#handleConsensusRound() @@ -245,6 +251,7 @@ public void startRound(@NonNull final Round round, @NonNull final State state) { worker.addItem(BlockItem.newBuilder().blockHeader(header).build()); } } + consensusTimeLastRound = round.getConsensusTimestamp(); } @Override @@ -289,7 +296,7 @@ public void setLastHandleTime(@NonNull final Instant lastHandleTime) { } @Override - public void endRound(@NonNull final State state, final long roundNum) { + public boolean endRound(@NonNull final State state, final long roundNum) { if (shouldCloseBlock(roundNum, roundsPerBlock)) { // If there were no user transactions in the block, this writes all the accumulated // items starting from the header, sacrificing the benefits of concurrency; but @@ -371,7 +378,9 @@ public void endRound(@NonNull final State state, final long roundNum) { DiskStartupNetworks.writeNetworkInfo( state, exportPath, EnumSet.allOf(InfoType.class), platformStateFacade); } + return true; } + return false; } @Override @@ -502,10 +511,24 @@ private static boolean impliesPostUpgradeWorkPending( } private boolean shouldCloseBlock(final long roundNumber, final int roundsPerBlock) { + // We need the signer to be ready if (!blockHashSigner.isReady()) { return false; } - return roundNumber % roundsPerBlock == 0 || roundNumber == freezeRoundNumber; + + // During freeze round, we should close the block regardless of other conditions + if (roundNumber == freezeRoundNumber) { + return true; + } + + // If blockPeriod is 0, use roundsPerBlock + if (blockPeriod.isZero()) { + return roundNumber % roundsPerBlock == 0; + } + + // For time-based blocks, check if enough consensus time has elapsed + final var elapsed = Duration.between(blockTimestamp, consensusTimeLastRound); + return elapsed.compareTo(blockPeriod) >= 0; } class BlockStreamManagerTask { diff --git a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/FileBlockItemWriter.java b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/FileBlockItemWriter.java index e9668e02fd07..f423d1235434 100644 --- a/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/FileBlockItemWriter.java +++ b/hedera-node/hedera-app/src/main/java/com/hedera/node/app/blocks/impl/FileBlockItemWriter.java @@ -170,6 +170,13 @@ public void closeBlock() { try { writableStreamingData.close(); state = State.CLOSED; + // Write a .mf file to indicate that the block file is complete. + final Path markerFile = getBlockFilePath(blockNumber).resolveSibling(longToFileName(blockNumber) + ".mf"); + if (Files.exists(markerFile)) { + logger.info("Skipping block marker file for {} as it already exists", markerFile); + } else { + Files.createFile(markerFile); + } } catch (final IOException e) { logger.error("Error closing the FileBlockItemWriter output stream", e); throw new UncheckedIOException(e); @@ -194,7 +201,7 @@ private Path getBlockFilePath(final long blockNumber) { * @return the 36-character string padded with leading zeros */ @NonNull - private static String longToFileName(final long value) { + public static String longToFileName(final long value) { // Convert the signed long to an unsigned long using BigInteger for correct representation BigInteger unsignedValue = BigInteger.valueOf(value & Long.MAX_VALUE).add(BigInteger.valueOf(Long.MIN_VALUE & value)); diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/BlockStreamManagerImplTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/BlockStreamManagerImplTest.java index 07206c67e9f4..96e9d4bb47d8 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/BlockStreamManagerImplTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/BlockStreamManagerImplTest.java @@ -8,7 +8,6 @@ import static com.hedera.node.app.blocks.BlockStreamService.FAKE_RESTART_BLOCK_HASH; import static com.hedera.node.app.blocks.impl.BlockImplUtils.appendHash; import static com.hedera.node.app.blocks.impl.BlockImplUtils.combine; -import static com.hedera.node.app.blocks.impl.BlockStreamManagerImpl.classifyPendingWork; import static com.hedera.node.app.blocks.schemas.V0560BlockStreamSchema.BLOCK_STREAM_INFO_KEY; import static com.hedera.node.app.fixtures.AppTestBase.DEFAULT_CONFIG; import static com.hedera.node.app.hapi.utils.CommonUtils.noThrowSha384HashOf; @@ -26,6 +25,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -51,6 +51,7 @@ import com.hedera.node.app.records.BlockRecordService; import com.hedera.node.config.ConfigProvider; import com.hedera.node.config.VersionedConfigImpl; +import com.hedera.node.config.data.BlockStreamConfig; import com.hedera.node.config.testfixtures.HederaTestConfigBuilder; import com.hedera.pbj.runtime.ParseException; import com.hedera.pbj.runtime.io.buffer.Bytes; @@ -66,7 +67,10 @@ import com.swirlds.state.spi.WritableStates; import edu.umd.cs.findbugs.annotations.NonNull; import edu.umd.cs.findbugs.annotations.Nullable; +import java.time.Duration; import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; @@ -139,6 +143,12 @@ class BlockStreamManagerImplTest { @Mock private State state; + @Mock + private Iterator mockIterator; + + @Mock + private ConsensusEvent mockEvent; + private final AtomicReference lastAItem = new AtomicReference<>(); private final AtomicReference lastBItem = new AtomicReference<>(); private final AtomicReference stateRef = new AtomicReference<>(); @@ -157,14 +167,14 @@ void setUp() { void classifiesPendingGenesisWorkByIntervalTime() { assertSame( BlockStreamManager.PendingWork.GENESIS_WORK, - classifyPendingWork(BlockStreamInfo.DEFAULT, SemanticVersion.DEFAULT)); + BlockStreamManagerImpl.classifyPendingWork(BlockStreamInfo.DEFAULT, SemanticVersion.DEFAULT)); } @Test void classifiesPriorVersionHasPostUpgradeWorkWithDifferentVersionButIntervalTime() { assertSame( POST_UPGRADE_WORK, - classifyPendingWork( + BlockStreamManagerImpl.classifyPendingWork( BlockStreamInfo.newBuilder() .creationSoftwareVersion( SemanticVersion.newBuilder().major(1).build()) @@ -177,7 +187,7 @@ void classifiesPriorVersionHasPostUpgradeWorkWithDifferentVersionButIntervalTime void classifiesNonGenesisBlockOfSameVersionWithWorkNotDoneStillHasPostUpgradeWork() { assertEquals( POST_UPGRADE_WORK, - classifyPendingWork( + BlockStreamManagerImpl.classifyPendingWork( BlockStreamInfo.newBuilder() .creationSoftwareVersion(CREATION_VERSION) .lastIntervalProcessTime(new Timestamp(1234567, 890)) @@ -189,7 +199,7 @@ void classifiesNonGenesisBlockOfSameVersionWithWorkNotDoneStillHasPostUpgradeWor void classifiesNonGenesisBlockOfSameVersionWithWorkDoneAsNoWork() { assertSame( NONE, - classifyPendingWork( + BlockStreamManagerImpl.classifyPendingWork( BlockStreamInfo.newBuilder() .postUpgradeWorkDone(true) .creationSoftwareVersion(CREATION_VERSION) @@ -238,12 +248,14 @@ void requiresLastHashToBeInitialized() { void startsAndEndsBlockWithSingleRoundPerBlockAsExpected() throws ParseException { givenSubjectWith( 1, + 0, blockStreamInfoWith( Bytes.EMPTY, CREATION_VERSION.copyBuilder().patch(0).build()), platformStateWithFreezeTime(null), aWriter); givenEndOfRoundSetup(); given(boundaryStateChangeListener.boundaryTimestampOrThrow()).willReturn(Timestamp.DEFAULT); + given(round.getConsensusTimestamp()).willReturn(CONSENSUS_NOW); given(round.getRoundNum()).willReturn(ROUND_NO); // Initialize the last (N-1) block hash @@ -325,11 +337,16 @@ void startsAndEndsBlockWithSingleRoundPerBlockAsExpected() throws ParseException void doesNotEndBlockEvenAtModZeroRoundIfSignerIsNotReady() { givenSubjectWith( 1, + 0, blockStreamInfoWith( Bytes.EMPTY, CREATION_VERSION.copyBuilder().patch(0).build()), platformStateWithFreezeTime(null), aWriter); - given(round.getRoundNum()).willReturn(ROUND_NO); + givenEndOfRoundSetup(); + lenient().when(round.getRoundNum()).thenReturn(ROUND_NO); + lenient().when(round.getConsensusTimestamp()).thenReturn(CONSENSUS_NOW); + lenient().when(blockHashSigner.isReady()).thenReturn(false); + lenient().when(boundaryStateChangeListener.boundaryTimestampOrThrow()).thenReturn(Timestamp.DEFAULT); // Initialize the last (N-1) block hash subject.initLastBlockHash(FAKE_RESTART_BLOCK_HASH); @@ -353,6 +370,7 @@ void doesNotEndBlockEvenAtModZeroRoundIfSignerIsNotReady() { // End the round (which cannot close the block since signer isn't ready) subject.endRound(state, ROUND_NO); + // Verify signer was checked but never asked to sign verify(blockHashSigner, never()).signFuture(any()); } @@ -360,6 +378,7 @@ void doesNotEndBlockEvenAtModZeroRoundIfSignerIsNotReady() { void blockWithNoUserTransactionsHasExpectedHeader() { givenSubjectWith( 1, + 0, blockStreamInfoWith( Bytes.EMPTY, CREATION_VERSION.copyBuilder().patch(0).build()), platformStateWithFreezeTime(null), @@ -367,6 +386,7 @@ void blockWithNoUserTransactionsHasExpectedHeader() { final AtomicReference writtenHeader = new AtomicReference<>(); givenEndOfRoundSetup(writtenHeader); given(boundaryStateChangeListener.boundaryTimestampOrThrow()).willReturn(Timestamp.DEFAULT); + given(round.getConsensusTimestamp()).willReturn(CONSENSUS_NOW); given(round.getRoundNum()).willReturn(ROUND_NO); // Initialize the last (N-1) block hash @@ -410,7 +430,21 @@ void blockWithNoUserTransactionsHasExpectedHeader() { @Test void doesNotEndBlockWithMultipleRoundPerBlockIfNotModZero() { givenSubjectWith( - 7, blockStreamInfoWith(Bytes.EMPTY, CREATION_VERSION), platformStateWithFreezeTime(null), aWriter); + 2, + 0, + blockStreamInfoWith( + Bytes.EMPTY, CREATION_VERSION.copyBuilder().patch(0).build()), + platformStateWithFreezeTime(null), + aWriter); + givenEndOfRoundSetup(); + given(round.getRoundNum()).willReturn(ROUND_NO); + given(round.getConsensusTimestamp()).willReturn(CONSENSUS_NOW); + given(state.getReadableStates(BlockStreamService.NAME)).willReturn(readableStates); + given(state.getReadableStates(PlatformStateService.NAME)).willReturn(readableStates); + given(readableStates.getSingleton(BLOCK_STREAM_INFO_KEY)) + .willReturn(blockStreamInfoState); + given(readableStates.getSingleton(PLATFORM_STATE_KEY)) + .willReturn(new WritableSingletonStateBase<>(PLATFORM_STATE_KEY, stateRef::get, stateRef::set)); // Initialize the last (N-1) block hash subject.initLastBlockHash(FAKE_RESTART_BLOCK_HASH); @@ -441,12 +475,14 @@ void doesNotEndBlockWithMultipleRoundPerBlockIfNotModZero() { void alwaysEndsBlockOnFreezeRoundPerBlockAsExpected() throws ParseException { final var resultHashes = Bytes.fromHex("aa".repeat(48) + "bb".repeat(48) + "cc".repeat(48) + "dd".repeat(48)); givenSubjectWith( - 7, + 2, + 2, // Use time-based blocks with 2 second period blockStreamInfoWith(resultHashes, CREATION_VERSION), - platformStateWithFreezeTime(CONSENSUS_NOW.minusSeconds(1)), + platformStateWithFreezeTime(CONSENSUS_NOW), aWriter); givenEndOfRoundSetup(); given(round.getRoundNum()).willReturn(ROUND_NO); + given(round.getConsensusTimestamp()).willReturn(CONSENSUS_NOW); given(boundaryStateChangeListener.boundaryTimestampOrThrow()).willReturn(Timestamp.DEFAULT); // Initialize the last (N-1) block hash @@ -526,6 +562,7 @@ void supportsMultiplePendingBlocksWithIndirectProofAsExpected() throws ParseExce given(blockHashSigner.isReady()).willReturn(true); givenSubjectWith( 1, + 0, blockStreamInfoWith(Bytes.EMPTY, CREATION_VERSION), platformStateWithFreezeTime(null), aWriter, @@ -538,6 +575,7 @@ void supportsMultiplePendingBlocksWithIndirectProofAsExpected() throws ParseExce .when(bWriter) .writePbjItem(any()); given(round.getRoundNum()).willReturn(ROUND_NO); + given(round.getConsensusTimestamp()).willReturn(CONSENSUS_NOW); given(boundaryStateChangeListener.boundaryTimestampOrThrow()).willReturn(Timestamp.DEFAULT); // Initialize the last (N-1) block hash @@ -558,6 +596,7 @@ void supportsMultiplePendingBlocksWithIndirectProofAsExpected() throws ParseExce // Start the round that will be block N+1 given(round.getRoundNum()).willReturn(ROUND_NO + 1); + given(round.getConsensusTimestamp()).willReturn(CONSENSUS_NOW.plusSeconds(1)); // Next round timestamp given(notification.round()).willReturn(ROUND_NO); given(notification.hash()).willReturn(FAKE_START_OF_BLOCK_STATE_HASH); // Notify the subject of the required start-of-state hash @@ -598,18 +637,174 @@ void supportsMultiplePendingBlocksWithIndirectProofAsExpected() throws ParseExce assertTrue(bProof.siblingHashes().isEmpty()); } + @Test + void createsBlockWhenTimePeriodElapses() { + // Given a 2 second block period + givenSubjectWith( + 1, + 2, + blockStreamInfoWith( + Bytes.EMPTY, CREATION_VERSION.copyBuilder().patch(0).build()), + platformStateWithFreezeTime(null), + aWriter); + givenEndOfRoundSetup(); + given(boundaryStateChangeListener.boundaryTimestampOrThrow()).willReturn(Timestamp.DEFAULT); + given(round.getRoundNum()).willReturn(ROUND_NO); + given(blockHashSigner.isReady()).willReturn(true); + + given(mockEvent.getConsensusTimestamp()).willReturn(Instant.ofEpochSecond(1000)); + + // Set up the signature future to complete immediately and run the callback synchronously + given(blockHashSigner.signFuture(any())).willReturn(mockSigningFuture); + doAnswer(invocationOnMock -> { + final Consumer consumer = invocationOnMock.getArgument(0); + consumer.accept(FIRST_FAKE_SIGNATURE); + return null; + }) + .when(mockSigningFuture) + .thenAcceptAsync(any()); + + // When starting a round at t=0 + given(round.getConsensusTimestamp()).willReturn(Instant.ofEpochSecond(1000)); + subject.initLastBlockHash(N_MINUS_2_BLOCK_HASH); + subject.startRound(round, state); + + // And another round at t=1 + given(round.getConsensusTimestamp()).willReturn(Instant.ofEpochSecond(1001)); + subject.startRound(round, state); + subject.endRound(state, ROUND_NO); + + // Then block should not be closed + verify(aWriter, never()).closeBlock(); + + // When starting another round at t=3 (after period) + given(round.getConsensusTimestamp()).willReturn(Instant.ofEpochSecond(1003)); + subject.startRound(round, state); + subject.endRound(state, ROUND_NO); + + // Then block should be closed + verify(aWriter).closeBlock(); + } + + @Test + void doesNotCreateBlockWhenTimePeriodNotElapsed() { + // Given a 2 second block period + givenSubjectWith( + 1, + 2, + blockStreamInfoWith( + Bytes.EMPTY, CREATION_VERSION.copyBuilder().patch(0).build()), + platformStateWithFreezeTime(null), + aWriter); + givenEndOfRoundSetup(); + given(round.getRoundNum()).willReturn(ROUND_NO); + given(blockHashSigner.isReady()).willReturn(true); + + // When starting a round at t=0 + given(round.getConsensusTimestamp()).willReturn(Instant.ofEpochSecond(1000)); + subject.initLastBlockHash(N_MINUS_2_BLOCK_HASH); + subject.startRound(round, state); + + // And another round at t=1.5 + given(round.getConsensusTimestamp()).willReturn(Instant.ofEpochSecond(1001, 500_000_000)); + subject.startRound(round, state); + subject.endRound(state, ROUND_NO); + + // Then block should not be closed + verify(aWriter, never()).closeBlock(); + } + + @Test + void alwaysEndsBlockOnFreezeRoundEvenIfPeriodNotElapsed() throws ParseException { + // Given a 2 second block period + givenSubjectWith( + 1, + 2, + blockStreamInfoWith( + Bytes.EMPTY, CREATION_VERSION.copyBuilder().patch(0).build()), + platformStateWithFreezeTime(Instant.ofEpochSecond(1001)), + aWriter); + givenEndOfRoundSetup(); + given(boundaryStateChangeListener.boundaryTimestampOrThrow()).willReturn(Timestamp.DEFAULT); + given(round.getRoundNum()).willReturn(ROUND_NO); + given(blockHashSigner.isReady()).willReturn(true); + + // Set up the signature future to complete immediately and run the callback synchronously + given(blockHashSigner.signFuture(any())).willReturn(mockSigningFuture); + doAnswer(invocationOnMock -> { + final Consumer consumer = invocationOnMock.getArgument(0); + consumer.accept(FIRST_FAKE_SIGNATURE); + return null; + }) + .when(mockSigningFuture) + .thenAcceptAsync(any()); + + // When starting a round at t=0 + given(round.getConsensusTimestamp()).willReturn(Instant.ofEpochSecond(1000)); + subject.initLastBlockHash(N_MINUS_2_BLOCK_HASH); + subject.startRound(round, state); + + // And another round at t=1 with freeze + given(round.getConsensusTimestamp()).willReturn(Instant.ofEpochSecond(1001)); + subject.startRound(round, state); + subject.endRound(state, ROUND_NO); + + // Then block should be closed due to freeze, even though period not elapsed + verify(aWriter).closeBlock(); + } + + @Test + void usesRoundsPerBlockWhenBlockPeriodIsZero() throws ParseException { + // Given blockPeriodSeconds=0 and roundsPerBlock=2 + givenSubjectWith( + 2, + 0, + blockStreamInfoWith( + Bytes.EMPTY, CREATION_VERSION.copyBuilder().patch(0).build()), + platformStateWithFreezeTime(null), + aWriter); + givenEndOfRoundSetup(); + given(boundaryStateChangeListener.boundaryTimestampOrThrow()).willReturn(Timestamp.DEFAULT); + given(blockHashSigner.isReady()).willReturn(true); + + // Set up the signature future to complete immediately and run the callback synchronously + given(blockHashSigner.signFuture(any())).willReturn(mockSigningFuture); + doAnswer(invocationOnMock -> { + final Consumer consumer = invocationOnMock.getArgument(0); + consumer.accept(FIRST_FAKE_SIGNATURE); + return null; + }) + .when(mockSigningFuture) + .thenAcceptAsync(any()); + + // When processing rounds + given(round.getConsensusTimestamp()).willReturn(Instant.ofEpochSecond(1000)); + subject.initLastBlockHash(N_MINUS_2_BLOCK_HASH); + + // First round (not mod 2) + given(round.getRoundNum()).willReturn(1L); + subject.startRound(round, state); + subject.endRound(state, 1L); + verify(aWriter, never()).closeBlock(); + + // Second round (mod 2) + given(round.getRoundNum()).willReturn(2L); + subject.startRound(round, state); + subject.endRound(state, 2L); + verify(aWriter).closeBlock(); + } + private void givenSubjectWith( final int roundsPerBlock, + final int blockPeriod, @NonNull final BlockStreamInfo blockStreamInfo, @NonNull final PlatformState platformState, @NonNull final BlockItemWriter... writers) { - given(round.getConsensusTimestamp()).willReturn(CONSENSUS_NOW); - given(round.iterator()) - .willAnswer(invocationOnMock -> List.of(consensusEvent).iterator()); - given(consensusEvent.getConsensusTimestamp()).willReturn(CONSENSUS_NOW); final AtomicInteger nextWriter = new AtomicInteger(0); final var config = HederaTestConfigBuilder.create() + .withConfigDataType(BlockStreamConfig.class) .withValue("blockStream.roundsPerBlock", roundsPerBlock) + .withValue("blockStream.blockPeriod", Duration.of(blockPeriod, ChronoUnit.SECONDS)) .getOrCreateConfig(); given(configProvider.getConfiguration()).willReturn(new VersionedConfigImpl(config, 1L)); subject = new BlockStreamManagerImpl( @@ -626,10 +821,6 @@ private void givenSubjectWith( infoRef.set(blockStreamInfo); stateRef.set(platformState); blockStreamInfoState = new WritableSingletonStateBase<>(BLOCK_STREAM_INFO_KEY, infoRef::get, infoRef::set); - given(readableStates.getSingleton(BLOCK_STREAM_INFO_KEY)) - .willReturn(blockStreamInfoState); - given(readableStates.getSingleton(PLATFORM_STATE_KEY)) - .willReturn(new WritableSingletonStateBase<>(PLATFORM_STATE_KEY, stateRef::get, stateRef::set)); } private void givenEndOfRoundSetup() { @@ -637,8 +828,13 @@ private void givenEndOfRoundSetup() { } private void givenEndOfRoundSetup(@Nullable final AtomicReference headerRef) { - given(boundaryStateChangeListener.flushChanges()).willReturn(FAKE_STATE_CHANGES); - doAnswer(invocationOnMock -> { + // Add mock for round iterator + lenient().when(round.iterator()).thenReturn(mockIterator); + lenient().when(mockIterator.next()).thenReturn(mockEvent); + lenient().when(mockEvent.getConsensusTimestamp()).thenReturn(CONSENSUS_NOW); + lenient().when(boundaryStateChangeListener.flushChanges()).thenReturn(FAKE_STATE_CHANGES); + lenient() + .doAnswer(invocationOnMock -> { lastAItem.set(invocationOnMock.getArgument(0)); if (headerRef != null) { final var item = BlockItem.PROTOBUF.parse(lastAItem.get()); @@ -650,10 +846,20 @@ private void givenEndOfRoundSetup(@Nullable final AtomicReference h }) .when(aWriter) .writePbjItem(any()); - given(state.getWritableStates(BlockStreamService.NAME)).willReturn(writableStates); - given(writableStates.getSingleton(BLOCK_STREAM_INFO_KEY)) - .willReturn(blockStreamInfoState); - doAnswer(invocationOnMock -> { + lenient().when(state.getWritableStates(BlockStreamService.NAME)).thenReturn(writableStates); + lenient().when(state.getReadableStates(BlockStreamService.NAME)).thenReturn(readableStates); + lenient().when(state.getReadableStates(PlatformStateService.NAME)).thenReturn(readableStates); + lenient() + .when(writableStates.getSingleton(BLOCK_STREAM_INFO_KEY)) + .thenReturn(blockStreamInfoState); + lenient() + .when(readableStates.getSingleton(BLOCK_STREAM_INFO_KEY)) + .thenReturn(blockStreamInfoState); + lenient() + .when(readableStates.getSingleton(PLATFORM_STATE_KEY)) + .thenReturn(new WritableSingletonStateBase<>(PLATFORM_STATE_KEY, stateRef::get, stateRef::set)); + lenient() + .doAnswer(invocationOnMock -> { blockStreamInfoState.commit(); return null; }) @@ -669,6 +875,7 @@ private BlockStreamInfo blockStreamInfoWith( .trailingBlockHashes(appendHash(N_MINUS_2_BLOCK_HASH, Bytes.EMPTY, 256)) .trailingOutputHashes(resultHashes) .lastIntervalProcessTime(CONSENSUS_THEN) + .blockTime(asTimestamp(CONSENSUS_NOW.minusSeconds(5))) // Add block time to track last block creation .build(); } diff --git a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/FileBlockItemWriterTest.java b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/FileBlockItemWriterTest.java index d04b199bf32a..e6ab5ec4ac7b 100644 --- a/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/FileBlockItemWriterTest.java +++ b/hedera-node/hedera-app/src/test/java/com/hedera/node/app/blocks/impl/FileBlockItemWriterTest.java @@ -29,6 +29,9 @@ @ExtendWith(MockitoExtension.class) public class FileBlockItemWriterTest { + private static final String MF = "000000000000000000000000000000000001.mf"; + private static final String BLK_GZ = "000000000000000000000000000000000001.blk.gz"; + @TempDir Path tempDir; @@ -48,7 +51,7 @@ public class FileBlockItemWriterTest { private FileSystem fileSystem; @Test - public void testOpenBlock() { + protected void testOpenBlock() { when(configProvider.getConfiguration()).thenReturn(versionedConfiguration); when(versionedConfiguration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); when(blockStreamConfig.compressFilesOnCreation()).thenReturn(true); @@ -59,16 +62,20 @@ public void testOpenBlock() { fileBlockItemWriter.openBlock(1); // Assertion to check if the directory is created - Path expectedDirectory = tempDir.resolve("block-0.0.3"); + final Path expectedDirectory = tempDir.resolve("block-0.0.3"); assertThat(Files.exists(expectedDirectory)).isTrue(); // Assertion to check if the block file is created - Path expectedBlockFile = expectedDirectory.resolve("000000000000000000000000000000000001.blk.gz"); + final Path expectedBlockFile = expectedDirectory.resolve(BLK_GZ); assertThat(Files.exists(expectedBlockFile)).isTrue(); + + // Marker file should not exist yet since block is not closed + final Path expectedMarkerFile = expectedDirectory.resolve(MF); + assertThat(Files.exists(expectedMarkerFile)).isFalse(); } @Test - public void testOpenBlockCannotInitializeTwice() { + protected void testOpenBlockCannotInitializeTwice() { when(configProvider.getConfiguration()).thenReturn(versionedConfiguration); when(versionedConfiguration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); when(blockStreamConfig.compressFilesOnCreation()).thenReturn(true); @@ -87,7 +94,7 @@ public void testOpenBlockCannotInitializeTwice() { } @Test - public void testOpenBlockNegativeBlockNumber() { + protected void testOpenBlockNegativeBlockNumber() { when(configProvider.getConfiguration()).thenReturn(versionedConfiguration); when(versionedConfiguration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); when(blockStreamConfig.compressFilesOnCreation()).thenReturn(true); @@ -101,7 +108,7 @@ public void testOpenBlockNegativeBlockNumber() { } @Test - public void testWriteItem() throws IOException { + protected void testWriteItem() throws IOException { when(configProvider.getConfiguration()).thenReturn(versionedConfiguration); when(versionedConfiguration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); when(blockStreamConfig.compressFilesOnCreation()).thenReturn(true); @@ -121,8 +128,16 @@ public void testWriteItem() throws IOException { // Close the block fileBlockItemWriter.closeBlock(); - // Read the contents of the file - Path expectedBlockFile = tempDir.resolve("block-0.0.3").resolve("000000000000000000000000000000000001.blk.gz"); + Path expectedDirectory = tempDir.resolve("block-0.0.3"); + final Path expectedBlockFile = expectedDirectory.resolve("000000000000000000000000000000000001.blk.gz"); + final Path expectedMarkerFile = expectedDirectory.resolve(MF); + + // Verify both block file and marker file exist + assertThat(Files.exists(expectedBlockFile)).isTrue(); + assertThat(Files.exists(expectedMarkerFile)).isTrue(); + + // Verify marker file is empty + assertThat(Files.size(expectedMarkerFile)).isZero(); // Ungzip the file try (GZIPInputStream gzis = new GZIPInputStream(Files.newInputStream(expectedBlockFile))) { @@ -135,7 +150,7 @@ public void testWriteItem() throws IOException { } @Test - public void testWriteItemBeforeOpen() { + protected void testWriteItemBeforeOpen() { when(configProvider.getConfiguration()).thenReturn(versionedConfiguration); when(versionedConfiguration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); when(blockStreamConfig.compressFilesOnCreation()).thenReturn(true); @@ -152,7 +167,7 @@ public void testWriteItemBeforeOpen() { } @Test - public void testCloseBlock() { + protected void testCloseBlock() throws IOException { when(configProvider.getConfiguration()).thenReturn(versionedConfiguration); when(versionedConfiguration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); when(blockStreamConfig.compressFilesOnCreation()).thenReturn(true); @@ -167,14 +182,20 @@ public void testCloseBlock() { // Close the block fileBlockItemWriter.closeBlock(); - // Read the contents of the file - Path expectedBlockFile = tempDir.resolve("block-0.0.3").resolve("000000000000000000000000000000000001.blk.gz"); + Path expectedDirectory = tempDir.resolve("block-0.0.3"); + Path expectedBlockFile = expectedDirectory.resolve("000000000000000000000000000000000001.blk.gz"); + Path expectedMarkerFile = expectedDirectory.resolve(MF); + // Verify both block file and marker file exist assertThat(Files.exists(expectedBlockFile)).isTrue(); + assertThat(Files.exists(expectedMarkerFile)).isTrue(); + + // Verify marker file is empty + assertThat(Files.size(expectedMarkerFile)).isZero(); } @Test - public void testCloseBlockNotOpen() { + protected void testCloseBlockNotOpen() { when(configProvider.getConfiguration()).thenReturn(versionedConfiguration); when(versionedConfiguration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); when(blockStreamConfig.compressFilesOnCreation()).thenReturn(true); @@ -188,7 +209,7 @@ public void testCloseBlockNotOpen() { } @Test - public void testCloseBlockAlreadyClosed() { + protected void testCloseBlockAlreadyClosed() { when(configProvider.getConfiguration()).thenReturn(versionedConfiguration); when(versionedConfiguration.getConfigData(BlockStreamConfig.class)).thenReturn(blockStreamConfig); when(blockStreamConfig.compressFilesOnCreation()).thenReturn(true); @@ -203,6 +224,11 @@ public void testCloseBlockAlreadyClosed() { // Close the block fileBlockItemWriter.closeBlock(); + // Verify marker file exists before attempting second close + Path expectedDirectory = tempDir.resolve("block-0.0.3"); + Path expectedMarkerFile = expectedDirectory.resolve(MF); + assertThat(Files.exists(expectedMarkerFile)).isTrue(); + assertThatThrownBy(fileBlockItemWriter::closeBlock, "Cannot close a FileBlockItemWriter that is already closed") .isInstanceOf(IllegalStateException.class); } diff --git a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockStreamConfig.java b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockStreamConfig.java index 25c2d019b80c..8518fed90782 100644 --- a/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockStreamConfig.java +++ b/hedera-node/hedera-config/src/main/java/com/hedera/node/config/data/BlockStreamConfig.java @@ -9,6 +9,7 @@ import com.swirlds.config.api.ConfigProperty; import com.swirlds.config.api.validation.annotation.Max; import com.swirlds.config.api.validation.annotation.Min; +import java.time.Duration; /** * Configuration for the block stream. @@ -27,5 +28,6 @@ public record BlockStreamConfig( @ConfigProperty(defaultValue = "true") @NetworkProperty boolean compressFilesOnCreation, @ConfigProperty(defaultValue = "32") @NetworkProperty int hashCombineBatchSize, @ConfigProperty(defaultValue = "1") @NetworkProperty int roundsPerBlock, + @ConfigProperty(defaultValue = "2s") @Min(0) @NetworkProperty Duration blockPeriod, @ConfigProperty(defaultValue = "localhost") String grpcAddress, @ConfigProperty(defaultValue = "8080") @Min(0) @Max(65535) int grpcPort) {} diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/BlockStreamAccess.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/BlockStreamAccess.java index 68e7e2e991b8..f38762489998 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/BlockStreamAccess.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/BlockStreamAccess.java @@ -212,18 +212,44 @@ private List orderedBlocksFrom(@NonNull final Path path) throws IOExceptio } private static boolean isBlockFile(@NonNull final Path path) { - return path.toFile().isFile() && extractBlockNumber(path) != -1; + if (!path.toFile().isFile() || extractBlockNumber(path) == -1) { + return false; + } + // Check for marker file + final Path markerFile = path.resolveSibling(path.getFileName() + .toString() + .replace(COMPRESSED_FILE_EXT, ".mf") + .replace(UNCOMPRESSED_FILE_EXT, ".mf")); + return Files.exists(markerFile); } - private static long extractBlockNumber(@NonNull final Path path) { + /** + * Extracts the block number from the given path. + * + * @param path the path + * @return the block number + */ + public static long extractBlockNumber(@NonNull final Path path) { return extractBlockNumber(path.getFileName().toString()); } + /** + * Checks if the given file is a block file. + * + * @param file the file + * @return true if the file is a block file, false otherwise + */ public static boolean isBlockFile(@NonNull final File file) { return file.isFile() && extractBlockNumber(file.getName()) != -1; } - private static long extractBlockNumber(@NonNull final String fileName) { + /** + * Extracts the block number from the given file name. + * + * @param fileName the file name + * @return the block number, or -1 if it cannot be extracted + */ + public static long extractBlockNumber(@NonNull final String fileName) { try { final var blockNumber = fileName.substring(0, fileName.indexOf(UNCOMPRESSED_FILE_EXT)); return Long.parseLong(blockNumber); diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/validators/block/BlockContentsValidator.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/validators/block/BlockContentsValidator.java index e605d390ab1d..8ba15d2363ed 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/validators/block/BlockContentsValidator.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/junit/support/validators/block/BlockContentsValidator.java @@ -6,6 +6,7 @@ import com.hedera.hapi.block.stream.Block; import com.hedera.hapi.block.stream.BlockItem; +import com.hedera.hapi.block.stream.output.TransactionOutput; import com.hedera.services.bdd.junit.support.BlockStreamAccess; import com.hedera.services.bdd.junit.support.BlockStreamValidator; import com.hedera.services.bdd.spec.HapiSpec; @@ -16,6 +17,9 @@ import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.Assertions; +/** + * Validates the structure of blocks. + */ public class BlockContentsValidator implements BlockStreamValidator { private static final Logger logger = LogManager.getLogger(BlockContentsValidator.class); @@ -51,50 +55,113 @@ public void validateBlocks(@NonNull final List blocks) { } private static void validate(Block block) { - final var blockItems = block.items(); + final var items = block.items(); + if (items.isEmpty()) { + Assertions.fail("Block is empty"); + } - // A block SHALL start with a `header`. - if (!blockItems.getFirst().hasBlockHeader()) { - Assertions.fail("Block does not start with a block header"); + if (items.size() <= 2) { + Assertions.fail("Block contains insufficient number of block items"); } - // A block SHALL end with a `state_proof`. - if (!blockItems.getLast().hasBlockProof()) { - Assertions.fail("Block does not end with a block proof"); + // A block SHALL start with a `block_header`. + validateBlockHeader(items.getFirst()); + + validateRounds(items.subList(1, items.size() - 1)); + + // A block SHALL end with a `block_proof`. + validateBlockProof(items.getLast()); + } + + private static void validateBlockHeader(final BlockItem item) { + if (!item.hasBlockHeader()) { + Assertions.fail("Block must start with a block header"); } + } - // In general, a `block_header` SHALL be followed by an `round_header` - if (!blockItems.get(1).hasRoundHeader()) { - Assertions.fail("Block header not followed by an round header"); + private static void validateBlockProof(final BlockItem item) { + if (!item.hasBlockProof()) { + Assertions.fail("Block must end with a block proof"); } + } - // In general, a `round_header` SHALL be followed by an `event_header`, but for hapiTestRestart we get - // state change singleton update for BLOCK_INFO_VALUE because the post-restart State initialization changes - // state before any event has reached consensus - if (!blockItems.get(2).hasEventHeader() && !blockItems.get(2).hasStateChanges()) { - Assertions.fail("Round header not followed by an event header or state changes"); + private static void validateRounds(final List roundItems) { + int currentIndex = 0; + while (currentIndex < roundItems.size()) { + currentIndex = validateSingleRound(roundItems, currentIndex); } + } + + /** + * Validates a single round within a block, starting at the given index. + * Returns the index of the next item after this round. + */ + private static int validateSingleRound(final List items, int startIndex) { + // Validate round header + if (!items.get(startIndex).hasRoundHeader()) { + logger.error("Expected round header at index {}, found: {}", startIndex, items.get(startIndex)); + Assertions.fail("Round must start with a round header"); + } + + int currentIndex = startIndex + 1; + boolean hasEventOrStateChange = false; - if (blockItems.stream().noneMatch(BlockItem::hasEventTransaction)) { // block without a user transaction - // A block with no user transactions contains a `block_header`, `event_headers`, `state_changes` and - // `state_proof`. - if (blockItems.stream() - .skip(2) // skip block_header and round_header - .limit(blockItems.size() - 3L) // skip state_proof - .anyMatch(item -> !item.hasEventHeader() && !item.hasStateChanges())) { + // Process items in this round until we hit the next round header or end of items + while (currentIndex < items.size() && !items.get(currentIndex).hasRoundHeader()) { + BlockItem item = items.get(currentIndex); + + if (item.hasEventHeader() || item.hasStateChanges()) { + hasEventOrStateChange = true; + currentIndex++; + } else if (item.hasEventTransaction()) { + currentIndex = validateTransactionGroup(items, currentIndex); + } else if (item.hasTransactionResult() || item.hasTransactionOutput()) { + logger.error( + "Found transaction result or output without preceding transaction at index {}", currentIndex); Assertions.fail( - "Block with no user transactions should contain items of type `block_header`, `event_headers`, `state_changes` or `state_proof`"); + "Found transaction result or output without preceding transaction at index " + currentIndex); + } else { + logger.error("Invalid item type at index {}: {}", currentIndex, item); + Assertions.fail("Invalid item type at index " + currentIndex + ": " + item); } + } + + if (!hasEventOrStateChange) { + logger.error("Round starting at index {} has no event headers or state changes", startIndex); + Assertions.fail("Round starting at index " + startIndex + " has no event headers or state changes"); + } + + return currentIndex; + } + + /** + * Validates a transaction group (transaction + result + optional outputs). + * Returns the index of the next item after this group. + */ + private static int validateTransactionGroup(final List items, int transactionIndex) { + if (transactionIndex + 1 >= items.size()) { + Assertions.fail("Event transaction at end of block with no result"); + } - return; + // Check for transaction result + BlockItem nextItem = items.get(transactionIndex + 1); + if (!nextItem.hasTransactionResult()) { + logger.error("Expected transaction result at index {}, found: {}", transactionIndex + 1, nextItem); + Assertions.fail("Event transaction must be followed by transaction result"); } - for (int i = 0; i < blockItems.size(); i++) { - // An `event_transaction` SHALL be followed by a `transaction_result`. - if (blockItems.get(i).hasEventTransaction() - && !blockItems.get(i + 1).hasTransactionResult()) { - Assertions.fail("Event transaction not followed by a transaction result"); + // Check for optional transaction outputs + int currentIndex = transactionIndex + 2; + while (currentIndex < items.size() && items.get(currentIndex).hasTransactionOutput()) { + // Check that transaction output is not equal to TransactionOutput.DEFAULT + if (TransactionOutput.DEFAULT.equals(items.get(currentIndex).transactionOutput())) { + logger.error("Transaction output at index {} is equal to TransactionOutput.DEFAULT", currentIndex); + Assertions.fail( + "Transaction output at index " + currentIndex + " is equal to TransactionOutput.DEFAULT"); } + currentIndex++; } + + return currentIndex; } } diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/UtilVerbs.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/UtilVerbs.java index a6f76c300301..ac6f14783a5a 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/UtilVerbs.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/UtilVerbs.java @@ -141,6 +141,7 @@ import com.hedera.services.bdd.spec.utilops.mod.TxnModification; import com.hedera.services.bdd.spec.utilops.pauses.HapiSpecSleep; import com.hedera.services.bdd.spec.utilops.pauses.HapiSpecWaitUntil; +import com.hedera.services.bdd.spec.utilops.pauses.HapiSpecWaitUntilNextBlock; import com.hedera.services.bdd.spec.utilops.streams.LogContainmentOp; import com.hedera.services.bdd.spec.utilops.streams.LogValidationOp; import com.hedera.services.bdd.spec.utilops.streams.StreamValidationOp; @@ -682,6 +683,14 @@ public static HapiSpecWaitUntil waitUntilStartOfNextAdhocPeriod(final long perio return untilStartOfNextAdhocPeriod(periodMs); } + /** + * Returns a {@link HapiSpecOperation} that sleeps until at least the beginning of the next block stream block. + * @return the operation that sleeps until the beginning of the next block stream block + */ + public static HapiSpecWaitUntilNextBlock waitUntilNextBlock() { + return new HapiSpecWaitUntilNextBlock(); + } + public static HapiSpecWaitUntil waitUntilJustBeforeNextStakingPeriod( final long stakePeriodMins, final long secondsBefore) { return untilJustBeforeStakingPeriod(stakePeriodMins, secondsBefore); diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/pauses/HapiSpecWaitUntilNextBlock.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/pauses/HapiSpecWaitUntilNextBlock.java new file mode 100644 index 000000000000..caa2c4fa800e --- /dev/null +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/spec/utilops/pauses/HapiSpecWaitUntilNextBlock.java @@ -0,0 +1,130 @@ +// SPDX-License-Identifier: Apache-2.0 +package com.hedera.services.bdd.spec.utilops.pauses; + +import static com.hedera.services.bdd.junit.hedera.ExternalPath.BLOCK_STREAMS_DIR; +import static com.hedera.services.bdd.spec.transactions.TxnVerbs.cryptoTransfer; +import static com.hedera.services.bdd.spec.transactions.crypto.HapiCryptoTransfer.tinyBarsFromTo; +import static com.hedera.services.bdd.spec.utilops.CustomSpecAssert.allRunFor; +import static com.hedera.services.bdd.suites.HapiSuite.FUNDING; +import static com.hedera.services.bdd.suites.HapiSuite.GENESIS; + +import com.hedera.node.app.blocks.impl.FileBlockItemWriter; +import com.hedera.services.bdd.junit.support.BlockStreamAccess; +import com.hedera.services.bdd.spec.HapiSpec; +import com.hedera.services.bdd.spec.utilops.UtilOp; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class HapiSpecWaitUntilNextBlock extends UtilOp { + private static final Logger log = LogManager.getLogger(HapiSpecWaitUntilNextBlock.class); + private static final String BLOCK_FILE_EXTENSION = ".blk"; + private static final String COMPRESSED_BLOCK_FILE_EXTENSION = BLOCK_FILE_EXTENSION + ".gz"; + private static final String MARKER_FILE_EXTENSION = ".mf"; + private static final Duration POLL_INTERVAL = Duration.ofMillis(100); + private static final Duration BACKGROUND_TRAFFIC_INTERVAL = Duration.ofMillis(1000); + private static final Duration TIMEOUT = Duration.ofSeconds(10); + + private boolean backgroundTraffic; + + public HapiSpecWaitUntilNextBlock withBackgroundTraffic(final boolean backgroundTraffic) { + this.backgroundTraffic = backgroundTraffic; + return this; + } + + @Override + protected boolean submitOp(@NonNull final HapiSpec spec) throws Throwable { + final var blockDir = spec.targetNetworkOrThrow().nodes().getFirst().getExternalPath(BLOCK_STREAMS_DIR); + if (blockDir == null) { + throw new IllegalStateException("Block stream directory not available"); + } + + // Ensure the directory exists before trying to walk it + if (!Files.exists(blockDir)) { + log.info("Creating block stream directory at {}", blockDir); + Files.createDirectories(blockDir); + } + + final var currentBlock = findLatestBlockNumber(blockDir); + final var targetBlock = currentBlock + 1; + + log.info("Waiting for block {} to appear (current block is {})", targetBlock, currentBlock); + + // Start background traffic if configured + final var stopTraffic = new AtomicBoolean(false); + CompletableFuture trafficFuture = null; + if (backgroundTraffic) { + trafficFuture = CompletableFuture.runAsync(() -> { + while (!stopTraffic.get()) { + try { + // Execute the background traffic operation + allRunFor( + spec, + cryptoTransfer(tinyBarsFromTo(GENESIS, FUNDING, 1)) + .deferStatusResolution() + .noLogging() + .hasAnyStatusAtAll()); + // Advance consensus time after successful execution + spec.sleepConsensusTime(BACKGROUND_TRAFFIC_INTERVAL); + } catch (Exception e) { + // Log but continue trying + log.info("Background traffic iteration failed", e); + } + } + }); + } + + try { + final var startTime = System.currentTimeMillis(); + while (true) { + if (isBlockComplete(blockDir, targetBlock)) { + log.info("Block {} has been created and completed", targetBlock); + return false; + } + if (System.currentTimeMillis() - startTime > TIMEOUT.toMillis()) { + throw new RuntimeException(String.format( + "Timeout waiting for block %d after %d seconds", targetBlock, TIMEOUT.toSeconds())); + } + spec.sleepConsensusTime(POLL_INTERVAL); + } + } finally { + if (trafficFuture != null) { + stopTraffic.set(true); + trafficFuture.join(); + } + } + } + + private long findLatestBlockNumber(Path blockDir) throws IOException { + try (Stream files = Files.walk(blockDir)) { + return files.filter(this::isBlockFile) + .map(BlockStreamAccess::extractBlockNumber) + .filter(num -> num >= 0) + .max(Long::compareTo) + .orElse(-1L); + } + } + + private boolean isBlockComplete(Path blockDir, long blockNumber) throws IOException { + try (Stream files = Files.walk(blockDir)) { + return files.anyMatch(path -> { + String fileName = path.getFileName().toString(); + return fileName.startsWith(FileBlockItemWriter.longToFileName(blockNumber)) + && fileName.endsWith(MARKER_FILE_EXTENSION); + }); + } + } + + private boolean isBlockFile(Path path) { + String fileName = path.getFileName().toString(); + return Files.isRegularFile(path) + && (fileName.endsWith(BLOCK_FILE_EXTENSION) || fileName.endsWith(COMPRESSED_BLOCK_FILE_EXTENSION)); + } +} diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/contract/records/RecordsSuite.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/contract/records/RecordsSuite.java index 2b57c65f932f..b4a5e9a1f3a6 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/contract/records/RecordsSuite.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/contract/records/RecordsSuite.java @@ -16,8 +16,7 @@ import static com.hedera.services.bdd.spec.utilops.CustomSpecAssert.allRunFor; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.assertionsHold; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.newKeyNamed; -import static com.hedera.services.bdd.spec.utilops.UtilVerbs.sleepFor; -import static com.hedera.services.bdd.spec.utilops.UtilVerbs.waitUntilStartOfNextAdhocPeriod; +import static com.hedera.services.bdd.spec.utilops.UtilVerbs.waitUntilNextBlock; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.withOpContext; import static com.hedera.services.bdd.suites.HapiSuite.GENESIS; import static com.hedera.services.bdd.suites.HapiSuite.ONE_HUNDRED_HBARS; @@ -37,8 +36,6 @@ import com.hedera.services.bdd.junit.HapiTest; import com.hedera.services.bdd.junit.RepeatableHapiTest; import com.hedera.services.bdd.spec.HapiSpec; -import com.hedera.services.bdd.spec.HapiSpecOperation; -import com.hedera.services.bdd.spec.transactions.token.TokenMovement; import com.hedera.services.bdd.spec.utilops.CustomSpecAssert; import com.hederahashgraph.api.proto.java.AccountAmount; import com.hederahashgraph.api.proto.java.AccountID; @@ -50,7 +47,6 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; -import java.util.function.Consumer; import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes32; import org.junit.jupiter.api.Assertions; @@ -143,7 +139,7 @@ final Stream blck003ReturnsTimestampOfTheBlock() { uploadInitCode(contract), contractCreate(contract), // Ensure we submit these two transactions in the same block - waitUntilStartOfNextAdhocPeriod(2_000), + waitUntilNextBlock().withBackgroundTraffic(true), ethereumCall(contract, LOG_NOW) .type(EthTxData.EthTransactionType.EIP1559) .signingWith(SECP_256K1_SOURCE_KEY) @@ -219,7 +215,7 @@ final Stream blck001And002And003And004ReturnsCorrectBlockProperties getTxnRecord(AUTO_ACCOUNT).andAllChildRecords(), uploadInitCode(contract), contractCreate(contract), - waitUntilStartOfNextAdhocPeriod(2_000L), + waitUntilNextBlock().withBackgroundTraffic(true), ethereumCall(contract, LOG_NOW) .type(EthTxData.EthTransactionType.EIP1559) .signingWith(SECP_256K1_SOURCE_KEY) @@ -231,7 +227,7 @@ final Stream blck001And002And003And004ReturnsCorrectBlockProperties .deferStatusResolution() .hasKnownStatus(ResponseCodeEnum.SUCCESS), // Make sure we submit the next transaction in the next block - waitUntilStartOfNextAdhocPeriod(2_000L), + waitUntilNextBlock().withBackgroundTraffic(true), ethereumCall(contract, LOG_NOW) .type(EthTxData.EthTransactionType.EIP1559) .signingWith(SECP_256K1_SOURCE_KEY) @@ -301,14 +297,13 @@ final Stream blockHashReturnsTheHashOfTheLatest256Blocks() { cryptoCreate(RECEIVER), cryptoTransfer(tinyBarsFromAccountToAlias(GENESIS, SECP_256K1_SOURCE_KEY, ONE_HUNDRED_HBARS)), withOpContext((spec, opLog) -> { - doNTransfers(spec, 256); - waitUntilStartOfNextAdhocPeriod(2_000L); + createNBlocks(spec, 256); final var ethCall = ethereumCall(contract, "getAllBlockHashes") .logged() .gasLimit(4_000_000L) .via("blockHashes"); final var blockHashRes = getTxnRecord("blockHashes").logged(); - allRunFor(spec, ethCall, waitUntilStartOfNextAdhocPeriod(2_000L), blockHashRes); + allRunFor(spec, ethCall, waitUntilNextBlock().withBackgroundTraffic(true), blockHashRes); assertTrue(blockHashRes .getResponseRecord() .getContractCallResult() @@ -324,18 +319,11 @@ final Stream blockHashReturnsTheHashOfTheLatest256Blocks() { })); } - // Helper method to perform multiple transfers and simulate multiple block creations - private void doNTransfers(@NonNull final HapiSpec spec, final int amount) { - allRunFor( - spec, - Stream.iterate(1, i -> i + 1) - .limit(amount) - .mapMulti((Integer i, Consumer consumer) -> { - consumer.accept(sleepFor(2_000L)); - consumer.accept( - cryptoTransfer(TokenMovement.movingHbar(i).between(ACCOUNT, RECEIVER))); - }) - .toArray(HapiSpecOperation[]::new)); + // Helper method to create N blocks, amount is divided by 2 to account waiting for next block each iteration + private void createNBlocks(final HapiSpec spec, final int amount) { + for (int i = 0; i < amount / 2; i++) { + allRunFor(spec, waitUntilNextBlock().withBackgroundTraffic(true)); + } } /** diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/hip993/SystemFileExportsTest.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/hip993/SystemFileExportsTest.java index 5bce4c80786f..cdb96feb711b 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/hip993/SystemFileExportsTest.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/hip993/SystemFileExportsTest.java @@ -26,6 +26,7 @@ import static com.hedera.services.bdd.spec.utilops.EmbeddedVerbs.simulatePostUpgradeTransaction; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.blockingOrder; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.doWithStartupConfig; +import static com.hedera.services.bdd.spec.utilops.UtilVerbs.doingContextual; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.given; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.nOps; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.newKeyNamed; @@ -37,6 +38,7 @@ import static com.hedera.services.bdd.spec.utilops.UtilVerbs.sourcingContextual; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.validateChargedUsdWithin; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.visibleItems; +import static com.hedera.services.bdd.spec.utilops.UtilVerbs.waitUntilNextBlock; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.withOpContext; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.writeToNodeWorkingDirs; import static com.hedera.services.bdd.spec.utilops.grouping.GroupingVerbs.getSystemFiles; @@ -134,9 +136,12 @@ final Stream syntheticNodeDetailsUpdateHappensAtUpgradeBoundary() { .serviceEndpoint(endpointsFor(i)) .grpcCertificateHash(grpcCertHashes[i]) .gossipCaCertificate(derEncoded(gossipCertificates.get().get((long) i)))))), + waitUntilNextBlock().withBackgroundTraffic(true), // And now simulate an upgrade boundary simulatePostUpgradeTransaction(), - cryptoCreate("secondUser").via("addressBookExport")); + cryptoCreate("secondUser").via("addressBookExport"), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @GenesisHapiTest @@ -157,9 +162,12 @@ final Stream syntheticAddressBookUpdateHappensAtUpgradeBoundary() { .serviceEndpoint(endpointsFor(i)) .grpcCertificateHash(grpcCertHashes[i]) .gossipCaCertificate(derEncoded(gossipCertificates.get().get((long) i)))))), + waitUntilNextBlock().withBackgroundTraffic(true), // And now simulate an upgrade boundary simulatePostUpgradeTransaction(), - cryptoCreate("secondUser").via("addressBookExport")); + cryptoCreate("secondUser").via("addressBookExport"), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @GenesisHapiTest @@ -215,6 +223,7 @@ final Stream syntheticFeeSchedulesUpdateHappensAtUpgradeBoundary() "networkAdmin.upgradeFeeSchedulesFile", feeSchedulesFile -> writeToNodeWorkingDirs(feeSchedulesJson, "data", "config", feeSchedulesFile)), + waitUntilNextBlock().withBackgroundTraffic(true), // And now simulate an upgrade boundary simulatePostUpgradeTransaction(), // Verify the new fee schedules (which include a subtype for scheduled contract fees) are in effect @@ -230,7 +239,9 @@ final Stream syntheticFeeSchedulesUpdateHappensAtUpgradeBoundary() .fee(ONE_HBAR)) .payingWith("civilian") .via("contractCall"), - validateChargedUsdWithin("contractCall", 0.1, 3.0)); + validateChargedUsdWithin("contractCall", 0.1, 3.0), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @GenesisHapiTest @@ -252,6 +263,7 @@ final Stream syntheticThrottlesUpdateHappensAtUpgradeBoundary() thr doWithStartupConfig( "networkAdmin.upgradeThrottlesFile", throttleDefsFile -> writeToNodeWorkingDirs(throttlesJson, "data", "config", throttleDefsFile)), + waitUntilNextBlock().withBackgroundTraffic(true), // And now simulate an upgrade boundary simulatePostUpgradeTransaction(), // Then verify the new throttles are in effect @@ -262,7 +274,9 @@ final Stream syntheticThrottlesUpdateHappensAtUpgradeBoundary() thr .deferStatusResolution(), mintToken("nft", List.of(ByteString.copyFromUtf8("NO"))) .payingWith("civilian") - .hasPrecheck(BUSY)); + .hasPrecheck(BUSY), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @GenesisHapiTest @@ -286,6 +300,7 @@ final Stream syntheticPropertyOverridesUpdateHappensAtUpgradeBounda "networkAdmin.upgradePropertyOverridesFile", propOverridesFile -> writeToNodeWorkingDirs(overrideProperties, "data", "config", propOverridesFile)), + waitUntilNextBlock().withBackgroundTraffic(true), // And now simulate an upgrade boundary simulatePostUpgradeTransaction(), // Then verify the new properties are in effect @@ -296,7 +311,9 @@ final Stream syntheticPropertyOverridesUpdateHappensAtUpgradeBounda ByteString.copyFromUtf8("ONE"), ByteString.copyFromUtf8("TOO"), ByteString.copyFromUtf8("MANY"))) - .hasKnownStatus(BATCH_SIZE_LIMIT_EXCEEDED)); + .hasKnownStatus(BATCH_SIZE_LIMIT_EXCEEDED), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @GenesisHapiTest @@ -315,6 +332,7 @@ final Stream syntheticPropertyOverridesUpdateCanBeEmptyFile() { doWithStartupConfig( "networkAdmin.upgradePropertyOverridesFile", propOverridesFile -> writeToNodeWorkingDirs("", "data", "config", propOverridesFile)), + waitUntilNextBlock().withBackgroundTraffic(true), // And now simulate an upgrade boundary simulatePostUpgradeTransaction(), // Then verify the previous override properties are cleared @@ -325,7 +343,9 @@ final Stream syntheticPropertyOverridesUpdateCanBeEmptyFile() { ByteString.copyFromUtf8("ONCE"), ByteString.copyFromUtf8("AGAIN"), ByteString.copyFromUtf8("OK"))), - getFileContents(APP_PROPERTIES).hasContents(ignore -> new byte[0])); + getFileContents(APP_PROPERTIES).hasContents(ignore -> new byte[0]), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @GenesisHapiTest @@ -349,6 +369,7 @@ final Stream syntheticPermissionOverridesUpdateHappensAtUpgradeBoun "networkAdmin.upgradePermissionOverridesFile", permissionOverridesFile -> writeToNodeWorkingDirs(overridePermissions, "data", "config", permissionOverridesFile)), + waitUntilNextBlock().withBackgroundTraffic(true), // And now simulate an upgrade boundary simulatePostUpgradeTransaction(), // Then verify the new permissions are in effect @@ -361,7 +382,9 @@ final Stream syntheticPermissionOverridesUpdateHappensAtUpgradeBoun ByteString.copyFromUtf8("TO"), ByteString.copyFromUtf8("BE"))) .payingWith("civilian") - .hasKnownStatus(UNAUTHORIZED)); + .hasKnownStatus(UNAUTHORIZED), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @GenesisHapiTest @@ -391,6 +414,7 @@ final Stream syntheticNodeAdminKeysUpdateHappensAtUpgradeBoundary() "data", "config", nodeAdminKeysFile))), + waitUntilNextBlock().withBackgroundTraffic(true), // And now simulate an upgrade boundary simulatePostUpgradeTransaction(), // Then verify the new admin keys are in effect @@ -408,7 +432,9 @@ final Stream syntheticNodeAdminKeysUpdateHappensAtUpgradeBoundary() nodeUpdate("3") .payingWith(GENESIS) .signedBy(GENESIS, "node3AdminKey") - .description("C")); + .description("C"), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @GenesisHapiTest diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/integration/ConcurrentIntegrationTests.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/integration/ConcurrentIntegrationTests.java index 12486cee8e7e..4389f6ce3c0c 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/integration/ConcurrentIntegrationTests.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/integration/ConcurrentIntegrationTests.java @@ -31,6 +31,7 @@ import static com.hedera.services.bdd.spec.utilops.UtilVerbs.blockStreamMustIncludePassFrom; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.buildUpgradeZipFrom; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.createHollow; +import static com.hedera.services.bdd.spec.utilops.UtilVerbs.doingContextual; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.freezeUpgrade; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.mutateNode; import static com.hedera.services.bdd.spec.utilops.UtilVerbs.prepareUpgrade; @@ -123,7 +124,9 @@ final Stream skipsStaleEventWithBusyStatus() { .setNode(asEntityString(4)) .withSubmissionStrategy(usingVersion(PAST)) .hasKnownStatus(com.hederahashgraph.api.proto.java.ResponseCodeEnum.BUSY), - getAccountBalance("somebody").hasTinyBars(0L)); + getAccountBalance("somebody").hasTinyBars(0L), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @EmbeddedHapiTest(MANIPULATES_EVENT_VERSION) @@ -153,7 +156,9 @@ final Stream failInvalidDuringDispatchRechargesFees() { // Confirm the payer was still charged a non-zero fee getAccountBalance("treasury") .hasTinyBars(spec -> amount -> - Optional.ofNullable(amount == ONE_HUNDRED_HBARS ? "Fee was not recharged" : null))); + Optional.ofNullable(amount == ONE_HUNDRED_HBARS ? "Fee was not recharged" : null)), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } @GenesisHapiTest diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/queries/AsNodeOperatorQueriesTest.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/queries/AsNodeOperatorQueriesTest.java index 789c5e2286e8..ba6873988f8b 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/queries/AsNodeOperatorQueriesTest.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/queries/AsNodeOperatorQueriesTest.java @@ -366,7 +366,7 @@ final Stream fileGetContentsQueryNodeOperatorNotCharged() { fileCreate(filename).contents("anyContent"), getFileContents(filename).payingWith(NODE_OPERATOR).asNodeOperator(), getFileContents(filename).payingWith(PAYER), - sleepFor(1_000), + sleepFor(3_000), getAccountBalance(NODE_OPERATOR).hasTinyBars(ONE_HUNDRED_HBARS), getAccountBalance(PAYER).hasTinyBars(lessThan(ONE_HUNDRED_HBARS)))); } diff --git a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/staking/RepeatableStakingTests.java b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/staking/RepeatableStakingTests.java index 4003885eb4cf..ed345cf83e97 100644 --- a/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/staking/RepeatableStakingTests.java +++ b/hedera-node/test-clients/src/main/java/com/hedera/services/bdd/suites/staking/RepeatableStakingTests.java @@ -29,6 +29,7 @@ import com.hedera.services.bdd.junit.HapiTestLifecycle; import com.hedera.services.bdd.junit.RepeatableHapiTest; import com.hedera.services.bdd.junit.support.TestLifecycle; +import com.hedera.services.bdd.spec.transactions.TxnUtils; import edu.umd.cs.findbugs.annotations.NonNull; import java.time.Instant; import java.util.List; @@ -107,6 +108,8 @@ Stream scheduledTransactionCrossingThresholdTriggersExpectedRewards // And we adjust the nanos so the user transaction will be in this staking // period, but the triggered transaction will be in the next staking period .minusNanos(Long.parseLong(value) + 1))), - cryptoCreate("justBeforeSecondPeriod")); + cryptoCreate("justBeforeSecondPeriod"), + // Trigger block closure to ensure block is closed + doingContextual(TxnUtils::triggerAndCloseAtLeastOneFileIfNotInterrupted)); } }