Skip to content

Commit

Permalink
chore: Changed Roster to List<PeerInfo> wherever possible, in prepara…
Browse files Browse the repository at this point in the history
…tion for dynamic connections (#17858)

Signed-off-by: Artur Biesiadowski <[email protected]>
  • Loading branch information
abies authored Feb 12, 2025
1 parent 34dc57a commit 222f269
Show file tree
Hide file tree
Showing 17 changed files with 210 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (C) 2016-2024 Hedera Hashgraph, LLC
* Copyright (C) 2016-2025 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package com.swirlds.platform;

import com.hedera.hapi.node.state.roster.Roster;
import com.hedera.hapi.node.state.roster.RosterEntry;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.common.io.streams.SerializableDataInputStream;
import com.swirlds.common.io.streams.SerializableDataOutputStream;
Expand Down Expand Up @@ -379,12 +380,23 @@ public static boolean hasAnyCauseSuppliedType(
// Only include peers with valid gossip certificates
// https://github.com/hashgraph/hedera-services/issues/16648
.filter(entry -> CryptoStatic.checkCertificate((RosterUtils.fetchGossipCaCertificate(entry))))
.map(entry -> new PeerInfo(
NodeId.of(entry.nodeId()),
// Assume that the first ServiceEndpoint describes the external hostname,
// which is the same order in which RosterRetriever.buildRoster(AddressBook) lists them.
Objects.requireNonNull(RosterUtils.fetchHostname(entry, 0)),
Objects.requireNonNull(RosterUtils.fetchGossipCaCertificate(entry))))
.map(Utilities::toPeerInfo)
.toList();
}

/**
* Converts single roster entry to PeerInfo, which is more abstract class representing information about possible node connection
* @param entry data to convert
* @return PeerInfo with extracted hostname, port and certificate for remote host
*/
public static @NonNull PeerInfo toPeerInfo(@NonNull RosterEntry entry) {
Objects.requireNonNull(entry);
return new PeerInfo(
NodeId.of(entry.nodeId()),
// Assume that the first ServiceEndpoint describes the external hostname,
// which is the same order in which RosterRetriever.buildRoster(AddressBook) lists them.
Objects.requireNonNull(RosterUtils.fetchHostname(entry, 0)),
RosterUtils.fetchPort(entry, 0),
Objects.requireNonNull(RosterUtils.fetchGossipCaCertificate(entry)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public SyncGossip(
NetworkUtils.createSocketFactory(selfId, peers, keysAndCerts, platformContext.getConfiguration());
// create an instance that can create new outbound connections
final OutboundConnectionCreator connectionCreator =
new OutboundConnectionCreator(platformContext, selfId, this, socketFactory, roster);
new OutboundConnectionCreator(platformContext, selfId, this, socketFactory, peers);
connectionManagers = new StaticConnectionManagers(topology, connectionCreator);
final InboundConnectionHandler inboundConnectionHandler = new InboundConnectionHandler(
platformContext,
Expand Down Expand Up @@ -250,7 +250,7 @@ public SyncGossip(
networkMetrics = new NetworkMetrics(platformContext.getMetrics(), selfId, peers);
platformContext.getMetrics().addUpdater(networkMetrics::update);

reconnectMetrics = new ReconnectMetrics(platformContext.getMetrics(), roster);
reconnectMetrics = new ReconnectMetrics(platformContext.getMetrics(), peers);

final StateConfig stateConfig = platformContext.getConfiguration().getConfigData(StateConfig.class);

Expand Down Expand Up @@ -383,7 +383,7 @@ private void buildSyncProtocolThreads(
.setThreadName("SyncProtocolWith" + otherId)
.setHangingThreadPeriod(hangingThreadDuration)
.setWork(new ProtocolNegotiatorThread(
connectionManagers.getManager(otherId, topology.shouldConnectTo(otherId)),
connectionManagers.getManager(otherId),
syncConfig.syncSleepAfterFailedNegotiation(),
handshakeProtocols,
new NegotiationProtocols(List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,13 @@

package com.swirlds.platform.gossip.modular;

import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;

import com.hedera.hapi.node.state.roster.Roster;
import com.hedera.hapi.node.state.roster.RosterEntry;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.platform.NodeId;
import com.swirlds.common.threading.framework.StoppableThread;
import com.swirlds.common.threading.framework.config.StoppableThreadConfiguration;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.platform.Utilities;
import com.swirlds.platform.config.BasicConfig;
import com.swirlds.platform.config.ThreadConfig;
import com.swirlds.platform.crypto.CryptoStatic;
import com.swirlds.platform.crypto.KeysAndCerts;
import com.swirlds.platform.gossip.sync.config.SyncConfig;
import com.swirlds.platform.network.*;
Expand All @@ -42,12 +36,9 @@
import com.swirlds.platform.network.protocol.ProtocolRunnable;
import com.swirlds.platform.network.topology.StaticConnectionManagers;
import com.swirlds.platform.network.topology.StaticTopology;
import com.swirlds.platform.roster.RosterUtils;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.logging.log4j.LogManager;
Expand All @@ -63,48 +54,34 @@ public class PeerCommunication implements ConnectionTracker {

private final NetworkMetrics networkMetrics;
private final StaticTopology topology;
private final Roster roster;
private final KeysAndCerts keysAndCerts;
private final PlatformContext platformContext;
private final List<PeerInfo> peers;
private final PeerInfo selfPeer;

/**
* Create manager of communication with neighbouring nodes for exchanging events.
*
* @param platformContext the platform context
* @param roster the current roster
* @param selfId this node's ID
* @param peers the current list of peers
* @param selfPeer this node's data
* @param keysAndCerts private keys and public certificates
*/
public PeerCommunication(
@NonNull final PlatformContext platformContext,
@NonNull final Roster roster,
@NonNull final NodeId selfId,
@NonNull final List<PeerInfo> peers,
@NonNull final PeerInfo selfPeer,
@NonNull final KeysAndCerts keysAndCerts) {

this.roster = roster;
this.keysAndCerts = keysAndCerts;
this.platformContext = platformContext;
this.peers = peers;
this.selfPeer = selfPeer;

final RosterEntry selfEntry = RosterUtils.getRosterEntry(roster, selfId.id());
final X509Certificate selfCert = RosterUtils.fetchGossipCaCertificate(selfEntry);
if (!CryptoStatic.checkCertificate(selfCert)) {
// Do not make peer connections if the self node does not have a valid signing certificate in the roster.
// https://github.com/hashgraph/hedera-services/issues/16648
logger.error(
EXCEPTION.getMarker(),
"The gossip certificate for node {} is missing or invalid. "
+ "This node will not connect to any peers.",
selfId);
this.peers = Collections.emptyList();
} else {
this.peers = Utilities.createPeerInfoList(roster, selfId);
}

this.networkMetrics = new NetworkMetrics(platformContext.getMetrics(), selfId, this.peers);
this.networkMetrics = new NetworkMetrics(platformContext.getMetrics(), selfPeer.nodeId(), this.peers);
platformContext.getMetrics().addUpdater(networkMetrics::update);

this.topology = new StaticTopology(peers, selfId);
this.topology = new StaticTopology(peers, selfPeer.nodeId());
}

/**
Expand All @@ -123,6 +100,14 @@ public NetworkMetrics getNetworkMetrics() {
return networkMetrics;
}

/**
*
* @return list of peers for current static topology
*/
public List<PeerInfo> getPeers() {
return peers;
}

List<StoppableThread> buildProtocolThreads(
final ThreadManager threadManager,
final NodeId selfId,
Expand All @@ -141,7 +126,7 @@ List<StoppableThread> buildProtocolThreads(
NetworkUtils.createSocketFactory(selfId, peers, keysAndCerts, platformContext.getConfiguration());
// create an instance that can create new outbound connections
final OutboundConnectionCreator connectionCreator =
new OutboundConnectionCreator(platformContext, selfId, this, socketFactory, roster);
new OutboundConnectionCreator(platformContext, selfId, this, socketFactory, peers);
var connectionManagers = new StaticConnectionManagers(topology, connectionCreator);
final InboundConnectionHandler inboundConnectionHandler = new InboundConnectionHandler(
platformContext,
Expand All @@ -151,16 +136,15 @@ List<StoppableThread> buildProtocolThreads(
connectionManagers::newConnection,
platformContext.getTime());
// allow other members to create connections to me
final RosterEntry rosterEntry = RosterUtils.getRosterEntry(roster, selfId.id());
// Assume all ServiceEndpoints use the same port and use the port from the first endpoint.
// Previously, this code used a "local port" corresponding to the internal endpoint,
// which should normally be the second entry in the endpoints list if it's obtained via
// a regular AddressBook -> Roster conversion.
// The assumption must be correct, otherwise, if ports were indeed different, then the old code
// using the AddressBook would never have listened on a port associated with the external endpoint,
// thus not allowing anyone to connect to the node from outside the local network, which we'd have noticed.
final ConnectionServer connectionServer = new ConnectionServer(
threadManager, RosterUtils.fetchPort(rosterEntry, 0), socketFactory, inboundConnectionHandler::handle);
final ConnectionServer connectionServer =
new ConnectionServer(threadManager, selfPeer.port(), socketFactory, inboundConnectionHandler::handle);
syncProtocolThreads.add(new StoppableThreadConfiguration<>(threadManager)
.setPriority(threadConfig.threadPrioritySync())
.setNodeId(selfId)
Expand All @@ -180,7 +164,7 @@ List<StoppableThread> buildProtocolThreads(
.setThreadName("SyncProtocolWith" + otherId)
.setHangingThreadPeriod(hangingThreadDuration)
.setWork(new ProtocolNegotiatorThread(
connectionManagers.getManager(otherId, topology.shouldConnectTo(otherId)),
connectionManagers.getManager(otherId),
syncConfig.syncSleepAfterFailedNegotiation(),
handshakeProtocols,
new NegotiationProtocols(protocolList.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@

package com.swirlds.platform.gossip.modular;

import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;

import com.google.common.collect.ImmutableList;
import com.hedera.hapi.node.state.roster.Roster;
import com.hedera.hapi.node.state.roster.RosterEntry;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.merkle.synchronization.config.ReconnectConfig;
import com.swirlds.common.platform.NodeId;
import com.swirlds.common.threading.framework.StoppableThread;
import com.swirlds.common.threading.manager.ThreadManager;
import com.swirlds.common.threading.pool.CachedPoolParallelExecutor;
import com.swirlds.component.framework.model.WiringModel;
import com.swirlds.component.framework.wires.input.BindableInputWire;
import com.swirlds.component.framework.wires.output.StandardOutputWire;
import com.swirlds.platform.Utilities;
import com.swirlds.platform.consensus.EventWindow;
import com.swirlds.platform.crypto.CryptoStatic;
import com.swirlds.platform.crypto.KeysAndCerts;
import com.swirlds.platform.event.PlatformEvent;
import com.swirlds.platform.gossip.FallenBehindManagerImpl;
Expand All @@ -36,8 +42,10 @@
import com.swirlds.platform.gossip.shadowgraph.Shadowgraph;
import com.swirlds.platform.gossip.sync.SyncManagerImpl;
import com.swirlds.platform.gossip.sync.config.SyncConfig;
import com.swirlds.platform.network.PeerInfo;
import com.swirlds.platform.network.communication.handshake.VersionCompareHandshake;
import com.swirlds.platform.network.protocol.*;
import com.swirlds.platform.roster.RosterUtils;
import com.swirlds.platform.state.SwirldStateManager;
import com.swirlds.platform.state.service.PlatformStateFacade;
import com.swirlds.platform.state.signed.ReservedSignedState;
Expand All @@ -48,7 +56,9 @@
import com.swirlds.platform.wiring.NoInput;
import com.swirlds.platform.wiring.components.Gossip;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -102,29 +112,45 @@ public SyncGossipModular(
@NonNull final IntakeEventCounter intakeEventCounter,
@NonNull final PlatformStateFacade platformStateFacade) {

this.network = new PeerCommunication(platformContext, roster, selfId, keysAndCerts);
final RosterEntry selfEntry = RosterUtils.getRosterEntry(roster, selfId.id());
final X509Certificate selfCert = RosterUtils.fetchGossipCaCertificate(selfEntry);
final List<PeerInfo> peers;
if (!CryptoStatic.checkCertificate(selfCert)) {
// Do not make peer connections if the self node does not have a valid signing certificate in the roster.
// https://github.com/hashgraph/hedera-services/issues/16648
logger.error(
EXCEPTION.getMarker(),
"The gossip certificate for node {} is missing or invalid. "
+ "This node will not connect to any peers.",
selfId);
peers = Collections.emptyList();
} else {
peers = Utilities.createPeerInfoList(roster, selfId);
}
final PeerInfo selfPeer = Utilities.toPeerInfo(selfEntry);

this.network = new PeerCommunication(platformContext, peers, selfPeer, keysAndCerts);

var shadowgraph =
new Shadowgraph(platformContext, roster.rosterEntries().size(), intakeEventCounter);
final Shadowgraph shadowgraph = new Shadowgraph(platformContext, peers.size() + 1, intakeEventCounter);

var fallenBehindManager = new FallenBehindManagerImpl(
final FallenBehindManagerImpl fallenBehindManager = new FallenBehindManagerImpl(
selfId,
this.network.getTopology(),
statusActionSubmitter,
() -> sharedState.fallenBehindCallback().get().run(),
platformContext.getConfiguration().getConfigData(ReconnectConfig.class));

var syncManager = new SyncManagerImpl(platformContext, fallenBehindManager);
final SyncManagerImpl syncManager = new SyncManagerImpl(platformContext, fallenBehindManager);

var syncConfig = platformContext.getConfiguration().getConfigData(SyncConfig.class);
final SyncConfig syncConfig = platformContext.getConfiguration().getConfigData(SyncConfig.class);
final int permitCount;
if (syncConfig.onePermitPerPeer()) {
permitCount = roster.rosterEntries().size() - 1;
permitCount = peers.size();
} else {
permitCount = syncConfig.syncProtocolPermitCount();
}

var syncPermitProvider = new SyncPermitProvider(platformContext, permitCount);
final SyncPermitProvider syncPermitProvider = new SyncPermitProvider(platformContext, permitCount);

sharedState = new SyncGossipSharedProtocolState(
this.network.getNetworkMetrics(),
Expand All @@ -147,20 +173,22 @@ public SyncGossipModular(
threadManager,
latestCompleteState,
roster,
network.getPeers(),
loadReconnectState,
clearAllPipelinesForReconnect,
swirldStateManager,
selfId,
controller,
platformStateFacade),
SyncProtocol.create(platformContext, sharedState, intakeEventCounter, roster));
SyncProtocol.create(platformContext, sharedState, intakeEventCounter, peers.size() + 1));

final ProtocolConfig protocolConfig = platformContext.getConfiguration().getConfigData(ProtocolConfig.class);
final VersionCompareHandshake versionCompareHandshake =
new VersionCompareHandshake(appVersion, !protocolConfig.tolerateMismatchedVersion());
final List<ProtocolRunnable> handshakeProtocols = List.of(versionCompareHandshake);

var threads = network.buildProtocolThreads(threadManager, selfId, handshakeProtocols, protocols);
final List<StoppableThread> threads =
network.buildProtocolThreads(threadManager, selfId, handshakeProtocols, protocols);

controller.registerThingToStartButNotStop(sharedState.shadowgraphExecutor());
controller.registerThingsToStart(threads);
Expand Down
Loading

0 comments on commit 222f269

Please sign in to comment.