Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initial consensus node to block node communication #17769

Merged
merged 47 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
f3a1286
wip
derektriley Jan 17, 2025
b6bb46f
wip testcontainer for Block Node
derektriley Jan 17, 2025
d835c7d
Sucessfully connect to block nodes
derektriley Jan 17, 2025
eec6a81
wip
derektriley Jan 17, 2025
82c9b1e
added block node connections proto. enhanced test suite for option of…
derektriley Jan 17, 2025
b0191c5
baseline streaming impl working with block node containers
derektriley Jan 20, 2025
abc545f
Establish connections to block nodes before starting platform
derektriley Jan 20, 2025
2e0cd87
Add simple simulator mode for quicker test iterations
derektriley Jan 20, 2025
23351ec
Gracefully stop simulated block nodes
derektriley Jan 20, 2025
1c0d308
Add local block node mode
derektriley Jan 21, 2025
24f1d35
changes to cn to bn grpc communication
petreze Feb 4, 2025
0a6665d
clean up connectToNode method
petreze Feb 4, 2025
6fe286f
Merge branch 'main' into wip-cn-to-bn
petreze Feb 7, 2025
a93ad69
cleanup
petreze Feb 7, 2025
cbfc3cc
fix gradle dependencies
petreze Feb 7, 2025
e8a1363
fix unit test
petreze Feb 7, 2025
04acee6
add unit tests and address PR comments
petreze Feb 10, 2025
14d7a61
satisfy the bare minimum requirements
petreze Feb 10, 2025
6b51f08
fix config files and other cleanups
petreze Feb 10, 2025
8d4f9e2
Merge branch 'main' into wip-cn-to-bn
petreze Feb 10, 2025
42dcb32
fix module dependency
petreze Feb 10, 2025
fd94032
fix module dependency
petreze Feb 10, 2025
c1cc0c5
remove unecessary test
petreze Feb 11, 2025
c72e7d4
unnecessary change in test-clients network setup
petreze Feb 11, 2025
1ed8a96
improve comments in block_node_connections.proto protobuf
petreze Feb 11, 2025
b3e21e4
remove boilerplate code
petreze Feb 11, 2025
9aeaf90
Merge branch 'main' into wip-cn-to-bn
petreze Feb 12, 2025
8d37800
Merge branch 'main' into wip-cn-to-bn
petreze Feb 13, 2025
3ac9098
fixes after merge with main
petreze Feb 13, 2025
c052347
fix resetting state variable in WritableQueueStateBase
Neeharika-Sompalli Feb 14, 2025
97c43a5
Merge branch '17904-D-reset-queue-state' into wip-cn-to-bn
petreze Feb 14, 2025
779837f
Merge branch 'main' into wip-cn-to-bn
petreze Feb 15, 2025
f529002
update wrong log
petreze Feb 17, 2025
602f775
Merge branch 'main' into wip-cn-to-bn
petreze Feb 18, 2025
bd47ceb
Merge branch 'main' into wip-cn-to-bn
petreze Feb 18, 2025
5dba1e9
Merge branch 'main' into wip-cn-to-bn
petreze Feb 18, 2025
4d04c2c
Merge branch 'main' into wip-cn-to-bn
petreze Feb 20, 2025
02518b5
address PR comments
petreze Feb 20, 2025
65cabfd
apply spotless with new headers
petreze Feb 20, 2025
d11511e
Merge branch 'main' into wip-cn-to-bn
petreze Feb 20, 2025
cc08583
make combined item writer to files and grpc streaming
petreze Feb 20, 2025
17774a5
Merge branch 'main' into wip-cn-to-bn
petreze Feb 20, 2025
d72a19b
Merge branch 'main' into wip-cn-to-bn
petreze Feb 20, 2025
0551431
add comments
petreze Feb 20, 2025
0ba3724
Merge branch 'main' into wip-cn-to-bn
petreze Feb 20, 2025
60b8212
Merge branch 'main' into wip-cn-to-bn
petreze Feb 21, 2025
7c511bf
fix compilation
petreze Feb 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions hapi/internal-protobufs/network/block_node_connections.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* # Block Node Connections
* A representation of the configuration for connecting to a set of block nodes.
*
* ### Keywords
* The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL NOT",
* "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY", and "OPTIONAL" in this
* document are to be interpreted as described in
* [RFC2119](https://www.ietf.org/rfc/rfc2119) and clarified in
* [RFC8174](https://www.ietf.org/rfc/rfc8174).
*/

syntax = "proto3";

package com.hedera.hapi.network;

option java_package = "com.hedera.node.internal.network.legacy";
// <<<pbj.java_package = "com.hedera.node.internal.network">>> This comment is special code for setting PBJ Compiler java package
option java_multiple_files = true;

/**
* A single block node connection configuration.<br/>
*
* This message provides the necessary details for a consensus node to
* establish a connection and stream blocks to a block node.</br>
* Each instance of this message represents one block node connection.
* Multiple instances may be included in {@link BlockNodeConnectionInfo}.
* - Each instance SHALL contain a valid address and port.<br/>
* - A consensus node SHALL connect to all listed block nodes.<br/>
* - The address MUST be a valid DNS hostname or IP address.<br/>
* - The port SHALL be within the valid TCP/UDP port range.<br/>
*/
message BlockNodeConfig {
/**
* The network address of the block node.<br/>
* This MAY be an IP address or a DNS hostname.
*/
string address = 1;

/**
* The port used to communicate with the block node.<br/>
* Valid port range is 1 to 65535.
*/
int32 port = 2;
}

/**
* Overall connection configuration for block nodes.<br/>
*
* The BlockNodeConnectionInfo message SHOULD encapsulate all the information needed
* for a consensus node to connect to a set of block nodes and stream blocks.
*/
message BlockNodeConnectionInfo {
/**
* A list of block node configurations.<br/>
* This field SHALL list all block node configurations
* to which the consensus node SHOULD connect.
*/
repeated BlockNodeConfig nodes = 1;

/**
* The size of the batches of block items to be streamed to block nodes.<br/>
*/
int32 block_item_batch_size = 2;
}
1 change: 1 addition & 0 deletions hedera-node/configuration/dev/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ledger.id=0x03
blockStream.streamMode=BOTH
nodes.enableDAB=true
blockStream.blockFileDir=data/blockStreams
blockStream.blockNodeConnectionFileDir=data/config
grpc.nodeOperatorPortEnabled=true
# For CI tests we want to override roster weights from override-network.json
networkAdmin.preserveStateWeightsDuringOverride=false
9 changes: 9 additions & 0 deletions hedera-node/configuration/dev/block-nodes.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"nodes": [
{
"address": "localhost",
"port": 8080
}
],
"blockItemBatchSize": 256
}
1 change: 1 addition & 0 deletions hedera-node/configuration/dev/node.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ hedera.recordStream.logDir=data/recordStreams
hedera.recordStream.sidecarDir=
stats.executionTimesToTrack=1000
blockStream.blockFileDir=data/blockStreams
blockStream.blockNodeConnectionFileDir=data/config
Original file line number Diff line number Diff line change
Expand Up @@ -593,12 +593,30 @@ public StateLifecycles<PlatformMerkleStateRoot> newStateLifecycles() {
public void notify(@NonNull final PlatformStatusChangeNotification notification) {
this.platformStatus = notification.getNewStatus();
logger.info("HederaNode#{} is {}", platform.getSelfId(), platformStatus.name());
final var streamToBlockNodes = configProvider
.getConfiguration()
.getConfigData(BlockStreamConfig.class)
.streamToBlockNodes();
switch (platformStatus) {
case ACTIVE -> startGrpcServer();
case CATASTROPHIC_FAILURE -> shutdownGrpcServer();
case ACTIVE -> {
startGrpcServer();
}
case FREEZE_COMPLETE -> {
logger.info("Platform status is now FREEZE_COMPLETE");
shutdownGrpcServer();
closeRecordStreams();
if (streamToBlockNodes && isNotEmbedded()) {
logger.info("FREEZE_COMPLETE - Shutting down connections to Block Nodes");
daggerApp.blockNodeConnectionManager().shutdown();
}
}
case CATASTROPHIC_FAILURE -> {
logger.error("Platform status is now CATASTROPHIC_FAILURE");
shutdownGrpcServer();
if (streamToBlockNodes && isNotEmbedded()) {
logger.info("CATASTROPHIC_FAILURE - Shutting down connections to Block Nodes");
daggerApp.blockNodeConnectionManager().shutdown();
}
}
case REPLAYING_EVENTS, STARTING_UP, OBSERVING, RECONNECT_COMPLETE, CHECKING, FREEZING, BEHIND -> {
// Nothing to do here, just enumerate for completeness
Expand Down Expand Up @@ -890,6 +908,9 @@ public void shutdown() {
shutdownGrpcServer();

if (daggerApp != null) {
logger.debug("Shutting down the Block Node Connection Manager");
daggerApp.blockNodeConnectionManager().shutdown();

logger.debug("Shutting down the state");
final var state = daggerApp.workingStateAccessor().getState();
if (state instanceof MerkleStateRoot msr) {
Expand Down Expand Up @@ -1336,4 +1357,26 @@ private void onAdoptRoster() {
new ReadableHintsStoreImpl(initState.getReadableStates(HintsService.NAME)));
}
}

/**
* Initializes block node connections and waits for at least one connection to be established.
* This should be called before platform.start() to ensure we don't miss any blocks.
*
* @param timeout maximum time to wait for a connection
*/
public void initializeBlockNodeConnections(java.time.Duration timeout) {
final var blockStreamConfig = configProvider.getConfiguration().getConfigData(BlockStreamConfig.class);
if (!blockStreamConfig.streamToBlockNodes()) {
logger.info("Block stream to Block Nodes is disabled, skipping block node connection initialization");
return;
}

logger.info("Initializing block node connections with timeout {}", timeout);
boolean connected = daggerApp.blockNodeConnectionManager().waitForConnection(timeout);
if (blockStreamConfig.shutdownNodeOnNoBlockNodes() && !connected) {
logger.error("No block node connections established within timeout, shutting down");
this.shutdown();
System.exit(1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.hedera.node.app.blocks.InitialStateHash;
import com.hedera.node.app.blocks.impl.BoundaryStateChangeListener;
import com.hedera.node.app.blocks.impl.KVStateChangeListener;
import com.hedera.node.app.blocks.impl.streaming.BlockNodeConnectionManager;
import com.hedera.node.app.components.IngestInjectionComponent;
import com.hedera.node.app.config.BootstrapConfigProviderImpl;
import com.hedera.node.app.config.ConfigProviderImpl;
Expand Down Expand Up @@ -138,6 +139,8 @@ public interface HederaInjectionComponent {

BlockRecordManager blockRecordManager();

BlockNodeConnectionManager blockNodeConnectionManager();

BlockStreamManager blockStreamManager();

FeeManager feeManager();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.hedera.node.app.state.StateLifecyclesImpl;
import com.hedera.node.app.tss.TssBlockHashSigner;
import com.hedera.node.app.version.ServicesSoftwareVersion;
import com.hedera.node.config.data.BlockStreamConfig;
import com.hedera.node.internal.network.Network;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.base.time.Time;
Expand Down Expand Up @@ -101,6 +102,7 @@
import com.swirlds.state.State;
import com.swirlds.state.merkle.MerkleStateRoot;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.time.InstantSource;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -371,6 +373,12 @@ public static void main(final String... args) throws Exception {
.withSystemTransactionEncoderCallback(hedera::encodeSystemTransaction);
final var platform = platformBuilder.build();
hedera.init(platform, selfId);

// Initialize block node connections before starting the platform
final var waitPeriodForActiveConnection =
platformConfig.getConfigData(BlockStreamConfig.class).waitPeriodForActiveConnection();
hedera.initializeBlockNodeConnections(Duration.ofMinutes(waitPeriodForActiveConnection));

platform.start();
hedera.run();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2024 Hedera Hashgraph, LLC
* Copyright (C) 2024-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,12 +17,12 @@
package com.hedera.node.app.blocks;

import com.hedera.node.app.blocks.impl.BlockStreamManagerImpl;
import com.hedera.node.app.blocks.impl.FileBlockItemWriter;
import com.hedera.node.app.blocks.impl.GrpcBlockItemWriter;
import com.hedera.node.app.blocks.impl.streaming.BlockNodeConnectionManager;
import com.hedera.node.app.blocks.impl.streaming.FileBlockItemWriter;
import com.hedera.node.app.blocks.impl.streaming.GrpcBlockItemWriter;
import com.hedera.node.config.ConfigProvider;
import com.hedera.node.config.data.BlockStreamConfig;
import com.swirlds.state.lifecycle.info.NodeInfo;
import dagger.Binds;
import dagger.Module;
import dagger.Provides;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -31,23 +31,32 @@
import javax.inject.Singleton;

@Module
public interface BlockStreamModule {
@Binds
public class BlockStreamModule {

@Provides
@Singleton
BlockStreamManager bindBlockStreamManager(BlockStreamManagerImpl blockStreamManager);
public BlockNodeConnectionManager provideBlockNodeConnectionManager(ConfigProvider configProvider) {
return new BlockNodeConnectionManager(configProvider);
}

@Provides
@Singleton
public BlockStreamManager provideBlockStreamManager(BlockStreamManagerImpl impl) {
return impl;
}

@Provides
@Singleton
static Supplier<BlockItemWriter> bindBlockItemWriterSupplier(
@NonNull final ConfigProvider configProvider,
@NonNull final NodeInfo selfNodeInfo,
@NonNull final FileSystem fileSystem) {
@NonNull final FileSystem fileSystem,
@NonNull final BlockNodeConnectionManager blockNodeConnectionManager) {
final var config = configProvider.getConfiguration();
final var blockStreamConfig = config.getConfigData(BlockStreamConfig.class);
return switch (blockStreamConfig.writerMode()) {
case FILE -> () -> new FileBlockItemWriter(configProvider, selfNodeInfo, fileSystem);
case GRPC -> () -> new GrpcBlockItemWriter(blockStreamConfig);
case FILE_AND_GRPC -> () -> new GrpcBlockItemWriter(blockNodeConnectionManager);
};
}
;
}
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ private synchronized void finishProofWithSignature(
.siblingHashes(siblingHashes.stream().flatMap(List::stream).toList());
final var proofItem = BlockItem.newBuilder().blockProof(proof).build();
block.writer().writePbjItem(BlockItem.PROTOBUF.toBytes(proofItem));
if (streamWriterType == BlockStreamWriterMode.FILE) {
if (streamWriterType == BlockStreamWriterMode.FILE
|| streamWriterType == BlockStreamWriterMode.FILE_AND_GRPC) {
block.writer().closeBlock();
}
if (block.number() != blockNumber) {
Expand Down
Loading
Loading