From eae0c4f79b86e09cf536c08f12542bc0b0e64ab7 Mon Sep 17 00:00:00 2001 From: Hristiyan Mitov <67628947+hMitov@users.noreply.github.com> Date: Wed, 29 Jan 2025 13:43:18 +0200 Subject: [PATCH] 403 process catchup request (#722) # Description - Implemented handling of catch up requests and introduced last finalized round reference to GrandpaSetState as per spec (It will be used in Play Grandpa Round and Process Catch-up Responses, too) - Added catch up threshold when sending catch up request as per spec - Extracted handling and sending of catch up requests to GrandpaSetState.java Fixes #403 > --------- Co-authored-by: Hristiyan Mitov --- .../limechain/grandpa/state/GrandpaRound.java | 2 + .../grandpa/state/GrandpaSetState.java | 82 +++++++++++++++++++ .../limechain/network/ConnectionManager.java | 5 ++ .../network/PeerMessageCoordinator.java | 14 +++- .../protocol/grandpa/GrandpaController.java | 12 ++- .../protocol/grandpa/GrandpaEngine.java | 52 ++++++------ .../protocol/grandpa/GrandpaService.java | 62 +++++++++----- .../catchup/req/CatchUpReqMessage.java | 2 + ...hUpMessage.java => CatchUpResMessage.java} | 4 +- ...java => CatchUpResMessageScaleReader.java} | 26 +++--- ...java => CatchUpResMessageScaleWriter.java} | 22 ++--- .../message/ProtocolMessageBuilder.java | 13 --- .../limechain/rpc/config/CommonConfig.java | 5 +- .../sync/warpsync/WarpSyncState.java | 17 ---- .../protocol/grandpa/GrandpaEngineTest.java | 8 +- ...CatchupResponseMessageScaleReaderTest.java | 4 +- 16 files changed, 216 insertions(+), 114 deletions(-) rename src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/{CatchUpMessage.java => CatchUpResMessage.java} (88%) rename src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/{CatchUpMessageScaleReader.java => CatchUpResMessageScaleReader.java} (60%) rename src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/{CatchUpMessageScaleWriter.java => CatchUpResMessageScaleWriter.java} (55%) diff --git a/src/main/java/com/limechain/grandpa/state/GrandpaRound.java b/src/main/java/com/limechain/grandpa/state/GrandpaRound.java index 7b440df45..c315d8708 100644 --- a/src/main/java/com/limechain/grandpa/state/GrandpaRound.java +++ b/src/main/java/com/limechain/grandpa/state/GrandpaRound.java @@ -4,6 +4,7 @@ import com.limechain.network.protocol.grandpa.messages.catchup.res.SignedVote; import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; import com.limechain.network.protocol.grandpa.messages.commit.Vote; +import com.limechain.network.protocol.warp.dto.BlockHeader; import io.emeraldpay.polkaj.types.Hash256; import lombok.Getter; import lombok.Setter; @@ -29,6 +30,7 @@ public class GrandpaRound implements Serializable { private Map preVotes = new ConcurrentHashMap<>(); private Map preCommits = new ConcurrentHashMap<>(); private SignedVote primaryVote; + private BlockHeader finalizedBlockHeader; private Map> pvEquivocations = new ConcurrentHashMap<>(); private Map> pcEquivocations = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java b/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java index 11cffeb8c..aa82f1f87 100644 --- a/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java +++ b/src/main/java/com/limechain/grandpa/state/GrandpaSetState.java @@ -4,12 +4,17 @@ import com.limechain.chain.lightsyncstate.Authority; import com.limechain.chain.lightsyncstate.LightSyncState; import com.limechain.exception.grandpa.GrandpaGenericException; +import com.limechain.network.PeerMessageCoordinator; +import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage; +import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage; import com.limechain.network.protocol.grandpa.messages.catchup.res.SignedVote; import com.limechain.network.protocol.grandpa.messages.commit.Vote; import com.limechain.network.protocol.grandpa.messages.consensus.GrandpaConsensusMessage; +import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage; import com.limechain.network.protocol.grandpa.messages.vote.SignedMessage; import com.limechain.network.protocol.grandpa.messages.vote.Subround; import com.limechain.network.protocol.grandpa.messages.vote.VoteMessage; +import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.runtime.Runtime; import com.limechain.state.AbstractState; import com.limechain.storage.DBConstants; @@ -22,6 +27,7 @@ import com.limechain.sync.warpsync.dto.ForcedAuthoritySetChange; import com.limechain.sync.warpsync.dto.ScheduledAuthoritySetChange; import io.emeraldpay.polkaj.types.Hash256; +import io.libp2p.core.PeerId; import io.libp2p.core.crypto.PubKey; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -37,6 +43,8 @@ import java.util.Map; import java.util.Optional; import java.util.PriorityQueue; +import java.util.Set; +import java.util.function.Supplier; import java.util.logging.Level; /** @@ -52,6 +60,7 @@ public class GrandpaSetState extends AbstractState implements ServiceConsensusState { private static final BigInteger THRESHOLD_DENOMINATOR = BigInteger.valueOf(3); + private static final BigInteger CATCH_UP_THRESHOLD = BigInteger.TWO; private List authorities; private BigInteger disabledAuthority; @@ -61,6 +70,7 @@ public class GrandpaSetState extends AbstractState implements ServiceConsensusSt private final RoundCache roundCache; private final KeyStore keyStore; private final KVRepository repository; + private final PeerMessageCoordinator messageCoordinator; private final PriorityQueue authoritySetChanges = new PriorityQueue<>(AuthoritySetChange.getComparator()); @@ -272,4 +282,76 @@ private void updateAuthorityStatus() { keyPair.ifPresentOrElse(AbstractState::setAuthorityStatus, AbstractState::clearAuthorityStatus); } + + public void initiateAndSendCatchUpRequest(NeighbourMessage neighbourMessage, PeerId peerId) { + // If peer has the same voter set id + if (neighbourMessage.getSetId().equals(setId)) { + + // Check if needed to catch-up peer + if (neighbourMessage.getRound().compareTo( + fetchLatestRound().getRoundNumber().add(CATCH_UP_THRESHOLD)) >= 0) { + log.log(Level.FINE, "Neighbor message indicates that the round of Peer " + peerId + " is ahead."); + + CatchUpReqMessage catchUpReqMessage = CatchUpReqMessage.builder() + .round(neighbourMessage.getRound()) + .setId(neighbourMessage.getSetId()).build(); + + messageCoordinator.sendCatchUpRequestToPeer(peerId, catchUpReqMessage); + } + } + } + + public void initiateAndSendCatchUpResponse(PeerId peerId, + CatchUpReqMessage catchUpReqMessage, + Supplier> peerIds) { + + if (!peerIds.get().contains(peerId)) { + throw new GrandpaGenericException("Requesting catching up from a non-peer."); + } + + if (!catchUpReqMessage.getSetId().equals(setId)) { + throw new GrandpaGenericException("Catch up message has a different setId."); + } + + if (catchUpReqMessage.getRound().compareTo(fetchLatestRound().getRoundNumber()) > 0) { + throw new GrandpaGenericException("Catching up on a round in the future."); + } + + GrandpaRound selectedGrandpaRound = selectRound(catchUpReqMessage.getRound(), catchUpReqMessage.getSetId()) + .orElseThrow(() -> new GrandpaGenericException("Target round was no found.")); + + SignedVote[] preCommits = selectedGrandpaRound.getPreCommits().values().toArray(SignedVote[]::new); + SignedVote[] preVotes = selectedGrandpaRound.getPreVotes().values().toArray(SignedVote[]::new); + + BlockHeader finalizedBlockHeader = selectedGrandpaRound.getFinalizedBlockHeader(); + + CatchUpResMessage catchUpResMessage = CatchUpResMessage.builder() + .round(selectedGrandpaRound.getRoundNumber()) + .setId(setId) + .preCommits(preCommits) + .preVotes(preVotes) + .blockHash(finalizedBlockHeader.getHash()) + .blockNumber(finalizedBlockHeader.getBlockNumber()) + .build(); + + messageCoordinator.sendCatchUpResponseToPeer(peerId, catchUpResMessage); + } + + private Optional selectRound(BigInteger peerRoundNumber, BigInteger peerSetId) { + GrandpaRound round = roundCache.getLatestRound(setId); + + while (round != null) { + // Round found + // Check voter set + if (round.getRoundNumber().equals(peerRoundNumber)) { + if (round.getCommitMessagesArchive().getFirst().getSetId().equals(peerSetId)) { + break; + } + } + // Go to the previous round + round = round.getPrevious(); + } + + return Optional.ofNullable(round); + } } \ No newline at end of file diff --git a/src/main/java/com/limechain/network/ConnectionManager.java b/src/main/java/com/limechain/network/ConnectionManager.java index f64cc7f70..68acbfed5 100644 --- a/src/main/java/com/limechain/network/ConnectionManager.java +++ b/src/main/java/com/limechain/network/ConnectionManager.java @@ -3,6 +3,7 @@ import com.limechain.network.dto.PeerInfo; import com.limechain.network.dto.ProtocolStreamType; import com.limechain.network.dto.ProtocolStreams; +import com.limechain.network.protocol.blockannounce.NodeRole; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; import com.limechain.network.protocol.warp.dto.BlockHeader; @@ -252,4 +253,8 @@ private void closeProtocolStream(final ProtocolStreams streams) { streams.getResponder().close(); } } + + public boolean checkIfPeerIsAuthorNode(PeerId peerId) { + return NodeRole.AUTHORING.getValue().equals(getPeerInfo(peerId).getNodeRole()); + } } diff --git a/src/main/java/com/limechain/network/PeerMessageCoordinator.java b/src/main/java/com/limechain/network/PeerMessageCoordinator.java index 27f84811b..869988d0b 100644 --- a/src/main/java/com/limechain/network/PeerMessageCoordinator.java +++ b/src/main/java/com/limechain/network/PeerMessageCoordinator.java @@ -4,6 +4,10 @@ import com.limechain.network.protocol.blockannounce.NodeRole; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleWriter; +import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage; +import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessageScaleWriter; +import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage; +import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessageScaleWriter; import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; import com.limechain.network.protocol.grandpa.messages.commit.CommitMessageScaleWriter; import com.limechain.network.protocol.grandpa.messages.vote.VoteMessage; @@ -113,8 +117,14 @@ public void sendCommitMessageToPeers(CommitMessage commitMessage) { }); } - public void sendCatchUpRequestToPeer(PeerId peerId) { - network.getGrandpaService().sendCatchUpRequest(network.getHost(), peerId); + public void sendCatchUpRequestToPeer(PeerId peerId, CatchUpReqMessage catchUpReqMessage) { + byte[] scaleMessage = ScaleUtils.Encode.encode(CatchUpReqMessageScaleWriter.getInstance(), catchUpReqMessage); + network.getGrandpaService().sendCatchUpRequest(network.getHost(), peerId, scaleMessage); + } + + public void sendCatchUpResponseToPeer(PeerId peerId, CatchUpResMessage catchUpResMessage) { + byte[] scaleMessage = ScaleUtils.Encode.encode(CatchUpResMessageScaleWriter.getInstance(), catchUpResMessage); + network.getGrandpaService().sendCatchUpResponse(network.getHost(), peerId, scaleMessage); } public void sendVoteMessageToPeers(VoteMessage voteMessage) { diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java index 824de4d59..422ede4c1 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaController.java @@ -37,8 +37,15 @@ public void sendCommitMessage(byte[] encodedCommitMessage) { /** * Sends a catch-up request message over the controller stream. */ - public void sendCatchUpRequest() { - engine.writeCatchUpRequest(stream, stream.remotePeerId()); + public void sendCatchUpRequest(byte[] encodedCatchUpReqMessage) { + engine.writeCatchUpRequest(stream, encodedCatchUpReqMessage); + } + + /** + * Sends a catch-up response message over the controller stream. + */ + public void sendCatchUpResponse(byte[] encodedCatchUpResMessage) { + engine.writeCatchUpResponse(stream, encodedCatchUpResMessage); } /** @@ -47,5 +54,4 @@ public void sendCatchUpRequest() { public void sendVoteMessage(byte[] encodedVoteMessage) { engine.writeCommitMessage(stream, encodedVoteMessage); } - } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java index b8f1251e1..228c5641e 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaEngine.java @@ -4,14 +4,12 @@ import com.limechain.grandpa.GrandpaService; import com.limechain.grandpa.state.GrandpaSetState; import com.limechain.network.ConnectionManager; -import com.limechain.network.protocol.blockannounce.NodeRole; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; import com.limechain.network.protocol.grandpa.messages.GrandpaMessageType; import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage; import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessageScaleReader; -import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessageScaleWriter; -import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpMessage; -import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpMessageScaleReader; +import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage; +import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessageScaleReader; import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; import com.limechain.network.protocol.grandpa.messages.commit.CommitMessageScaleReader; import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage; @@ -151,8 +149,8 @@ private void handleNeighbourMessage(byte[] message, PeerId peerId) { log.log(Level.FINE, "Received neighbour message from Peer " + peerId + "\n" + neighbourMessage); new Thread(() -> warpSyncState.syncNeighbourMessage(neighbourMessage, peerId)).start(); - if (NodeRole.AUTHORING.getValue().equals(connectionManager.getPeerInfo(peerId).getNodeRole())) { - warpSyncState.checkAndInitiateCatchUp(neighbourMessage, peerId); + if (AbstractState.isActiveAuthority() && connectionManager.checkIfPeerIsAuthorNode(peerId)) { + grandpaSetState.initiateAndSendCatchUpRequest(neighbourMessage, peerId); } } @@ -174,15 +172,18 @@ private void handleCommitMessage(byte[] message, PeerId peerId) { private void handleCatchupRequestMessage(byte[] message, PeerId peerId) { ScaleCodecReader reader = new ScaleCodecReader(message); CatchUpReqMessage catchUpReqMessage = reader.read(CatchUpReqMessageScaleReader.getInstance()); - //todo: handle catchup req message (authoring node responsibility) log.log(Level.INFO, "Received catch up request message from Peer " + peerId + "\n" + catchUpReqMessage); + + if (AbstractState.isActiveAuthority() && connectionManager.checkIfPeerIsAuthorNode(peerId)) { + grandpaSetState.initiateAndSendCatchUpResponse(peerId, catchUpReqMessage, connectionManager::getPeerIds); + } } private void handleCatchupResponseMessage(byte[] message, PeerId peerId) { ScaleCodecReader reader = new ScaleCodecReader(message); - CatchUpMessage catchUpMessage = reader.read(CatchUpMessageScaleReader.getInstance()); + CatchUpResMessage catchUpResMessage = reader.read(CatchUpResMessageScaleReader.getInstance()); //todo: handle catchup res message (authoring node responsibility) - log.log(Level.INFO, "Received catch up message from Peer " + peerId + "\n" + catchUpMessage); + log.log(Level.INFO, "Received catch up response message from Peer " + peerId + "\n" + catchUpResMessage); } /** @@ -214,7 +215,7 @@ public void writeNeighbourMessage(Stream stream, PeerId peerId) { throw new ScaleEncodingException(e); } - log.log(Level.FINE, "Sending neighbour message to peer " + peerId); + log.log(Level.FINE, "Sending neighbour message to Peer " + peerId); stream.writeAndFlush(buf.toByteArray()); } @@ -225,29 +226,30 @@ public void writeNeighbourMessage(Stream stream, PeerId peerId) { * @param encodedCommitMessage scale encoded CommitMessage object */ public void writeCommitMessage(Stream stream, byte[] encodedCommitMessage) { - log.log(Level.FINE, "Sending commit message to peer " + stream.remotePeerId()); + log.log(Level.FINE, "Sending commit message to Peer " + stream.remotePeerId()); stream.writeAndFlush(encodedCommitMessage); } /** * Send our GRANDPA catch-up request message on a given responder stream. * - * @param stream responder stream to write the message to - * @param peerId peer to send to + * @param stream responder stream to write the message to + * @param encodedCatchUpReqMessage scale encoded CatchUpRequestMessage object */ - public void writeCatchUpRequest(Stream stream, PeerId peerId) { - ByteArrayOutputStream buf = new ByteArrayOutputStream(); - try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) { - writer.write( - CatchUpReqMessageScaleWriter.getInstance(), - ProtocolMessageBuilder.buildCatchUpRequestMessage() - ); - } catch (IOException e) { - throw new ScaleEncodingException(e); - } + public void writeCatchUpRequest(Stream stream, byte[] encodedCatchUpReqMessage) { + log.log(Level.FINE, "Sending catch up request to Peer " + stream.remotePeerId()); + stream.writeAndFlush(encodedCatchUpReqMessage); + } - log.log(Level.FINE, "Sending catch up request to Peer " + peerId); - stream.writeAndFlush(buf.toByteArray()); + /** + * Send our GRANDPA catch-up response message on a given responder stream. + * + * @param stream responder stream to write the message to + * @param encodedCatchUpResMessage scale encoded CatchUpResMessage object + */ + public void writeCatchUpResponse(Stream stream, byte[] encodedCatchUpResMessage) { + log.log(Level.FINE, "Sending catch up response to Peer " + stream.remotePeerId()); + stream.writeAndFlush(encodedCatchUpResMessage); } /** diff --git a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java index fdbe62dd8..0fadc8afe 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/GrandpaService.java @@ -37,6 +37,11 @@ public void sendNeighbourMessage(Host us, PeerId peerId) { ); } + private void sendNeighbourMessage(Stream stream) { + GrandpaController controller = new GrandpaController(stream); + controller.sendNeighbourMessage(); + } + /** * Sends a commit message to a peer. If there is no initiator stream opened with the peer, * sends a handshake instead. @@ -54,12 +59,46 @@ public void sendCommitMessage(Host us, PeerId peerId, byte[] encodedCommitMessag ); } + /** + * Sends a catch-up request message to a peer. If there is no initiator stream opened with the peer, + * sends a handshake instead. + * + * @param us our host object + * @param peerId message receiver + * @param encodedCatchUpReqMessage scale encoded representation of the CatchUpReqMessage object + */ + public void sendCatchUpRequest(Host us, PeerId peerId, byte[] encodedCatchUpReqMessage) { + Optional.ofNullable(connectionManager.getPeerInfo(peerId)) + .map(p -> p.getGrandpaStreams().getInitiator()) + .ifPresentOrElse( + stream -> new GrandpaController(stream).sendCatchUpRequest(encodedCatchUpReqMessage), + () -> sendHandshake(us, peerId) + ); + } + + /** + * Sends a catch-up response message to a peer. If there is no initiator stream opened with the peer, + * sends a handshake instead. + * + * @param us our host object + * @param peerId message receiver + * @param encodedCatchUpResMessage scale encoded representation of the CatchUpResMessage object + */ + public void sendCatchUpResponse(Host us, PeerId peerId, byte[] encodedCatchUpResMessage) { + Optional.ofNullable(connectionManager.getPeerInfo(peerId)) + .map(p -> p.getGrandpaStreams().getInitiator()) + .ifPresentOrElse( + stream -> new GrandpaController(stream).sendCatchUpResponse(encodedCatchUpResMessage), + () -> sendHandshake(us, peerId) + ); + } + /** * Sends a vote message to a peer. If there is no initiator stream opened with the peer, * sends a handshake instead. * - * @param us our host object - * @param peerId message receiver + * @param us our host object + * @param peerId message receiver * @param encodedVoteMessage scale encoded representation of the VoteMessage object */ public void sendVoteMessage(Host us, PeerId peerId, byte[] encodedVoteMessage) { @@ -71,11 +110,6 @@ public void sendVoteMessage(Host us, PeerId peerId, byte[] encodedVoteMessage) { ); } - private void sendNeighbourMessage(Stream stream) { - GrandpaController controller = new GrandpaController(stream); - controller.sendNeighbourMessage(); - } - public void sendHandshake(Host us, PeerId peerId) { try { GrandpaController controller = this.protocol.dialPeer(us, peerId, us.getAddressBook()); @@ -84,18 +118,4 @@ public void sendHandshake(Host us, PeerId peerId) { log.warning("Failed to send Grandpa handshake to " + peerId); } } - - public void sendCatchUpRequest(Host us, PeerId peerId) { - Optional.ofNullable(connectionManager.getPeerInfo(peerId)) - .map(p -> p.getGrandpaStreams().getInitiator()) - .ifPresentOrElse( - this::sendCatchUpRequest, - () -> sendHandshake(us, peerId) - ); - } - - private void sendCatchUpRequest(Stream stream) { - GrandpaController controller = new GrandpaController(stream); - controller.sendCatchUpRequest(); - } } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/req/CatchUpReqMessage.java b/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/req/CatchUpReqMessage.java index 0024017ba..7f9ee1509 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/req/CatchUpReqMessage.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/req/CatchUpReqMessage.java @@ -1,6 +1,7 @@ package com.limechain.network.protocol.grandpa.messages.catchup.req; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @@ -9,6 +10,7 @@ @Data @AllArgsConstructor @NoArgsConstructor +@Builder public class CatchUpReqMessage { private BigInteger round; private BigInteger setId; diff --git a/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpMessage.java b/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpResMessage.java similarity index 88% rename from src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpMessage.java rename to src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpResMessage.java index b81638494..4b6a80013 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpMessage.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpResMessage.java @@ -2,6 +2,7 @@ import io.emeraldpay.polkaj.types.Hash256; import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @@ -10,7 +11,8 @@ @Data @AllArgsConstructor @NoArgsConstructor -public class CatchUpMessage { +@Builder +public class CatchUpResMessage { private BigInteger round; private BigInteger setId; private SignedVote[] preVotes; diff --git a/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpMessageScaleReader.java b/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpResMessageScaleReader.java similarity index 60% rename from src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpMessageScaleReader.java rename to src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpResMessageScaleReader.java index 7675c7290..76c3ddddc 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpMessageScaleReader.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpResMessageScaleReader.java @@ -11,42 +11,42 @@ import java.math.BigInteger; -public class CatchUpMessageScaleReader implements ScaleReader { +public class CatchUpResMessageScaleReader implements ScaleReader { - private static final CatchUpMessageScaleReader INSTANCE = new CatchUpMessageScaleReader(); + private static final CatchUpResMessageScaleReader INSTANCE = new CatchUpResMessageScaleReader(); private final UInt32Reader uint32Reader; private final UInt64Reader uint64Reader; private final ListReader signedVoteListReader; - private CatchUpMessageScaleReader() { + private CatchUpResMessageScaleReader() { uint32Reader = new UInt32Reader(); uint64Reader = new UInt64Reader(); signedVoteListReader = new ListReader<>(SignedVoteScaleReader.getInstance()); } - public static CatchUpMessageScaleReader getInstance() { + public static CatchUpResMessageScaleReader getInstance() { return INSTANCE; } @Override - public CatchUpMessage read(ScaleCodecReader reader) { + public CatchUpResMessage read(ScaleCodecReader reader) { int messageType = reader.readByte(); if (messageType != GrandpaMessageType.CATCH_UP_RESPONSE.getType()) { throw new WrongMessageTypeException( String.format("Trying to read message of type %d as a catch up response message", messageType)); } - CatchUpMessage catchUpMessage = new CatchUpMessage(); - catchUpMessage.setSetId(uint64Reader.read(reader)); - catchUpMessage.setRound(uint64Reader.read(reader)); - catchUpMessage.setPreVotes(signedVoteListReader + CatchUpResMessage catchUpResMessage = new CatchUpResMessage(); + catchUpResMessage.setSetId(uint64Reader.read(reader)); + catchUpResMessage.setRound(uint64Reader.read(reader)); + catchUpResMessage.setPreVotes(signedVoteListReader .read(reader).toArray(SignedVote[]::new)); - catchUpMessage.setPreCommits(signedVoteListReader + catchUpResMessage.setPreCommits(signedVoteListReader .read(reader).toArray(SignedVote[]::new)); - catchUpMessage.setBlockHash(new Hash256(reader.readUint256())); - catchUpMessage.setBlockNumber(BigInteger.valueOf(uint32Reader.read(reader))); + catchUpResMessage.setBlockHash(new Hash256(reader.readUint256())); + catchUpResMessage.setBlockNumber(BigInteger.valueOf(uint32Reader.read(reader))); - return catchUpMessage; + return catchUpResMessage; } } diff --git a/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpMessageScaleWriter.java b/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpResMessageScaleWriter.java similarity index 55% rename from src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpMessageScaleWriter.java rename to src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpResMessageScaleWriter.java index bd5397a76..24fa9fb17 100644 --- a/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpMessageScaleWriter.java +++ b/src/main/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchUpResMessageScaleWriter.java @@ -9,29 +9,29 @@ import java.io.IOException; import java.util.Arrays; -public class CatchUpMessageScaleWriter implements ScaleWriter { - private static final CatchUpMessageScaleWriter INSTANCE = new CatchUpMessageScaleWriter(); +public class CatchUpResMessageScaleWriter implements ScaleWriter { + private static final CatchUpResMessageScaleWriter INSTANCE = new CatchUpResMessageScaleWriter(); private final UInt64Writer uint64Writer; private final ListWriter signedVoteListWriter; - private CatchUpMessageScaleWriter() { + private CatchUpResMessageScaleWriter() { uint64Writer = new UInt64Writer(); signedVoteListWriter = new ListWriter<>(SignedVoteScaleWriter.getInstance()); } - public static CatchUpMessageScaleWriter getInstance() { + public static CatchUpResMessageScaleWriter getInstance() { return INSTANCE; } @Override - public void write(ScaleCodecWriter writer, CatchUpMessage catchUpMessage) throws IOException { + public void write(ScaleCodecWriter writer, CatchUpResMessage catchUpResMessage) throws IOException { writer.writeByte(GrandpaMessageType.CATCH_UP_RESPONSE.getType()); - uint64Writer.write(writer, catchUpMessage.getSetId()); - uint64Writer.write(writer, catchUpMessage.getRound()); - signedVoteListWriter.write(writer, Arrays.asList(catchUpMessage.getPreVotes())); - signedVoteListWriter.write(writer, Arrays.asList(catchUpMessage.getPreCommits())); - writer.writeUint256(catchUpMessage.getBlockHash().getBytes()); - writer.writeUint32(catchUpMessage.getBlockNumber().longValue()); + uint64Writer.write(writer, catchUpResMessage.getSetId()); + uint64Writer.write(writer, catchUpResMessage.getRound()); + signedVoteListWriter.write(writer, Arrays.asList(catchUpResMessage.getPreVotes())); + signedVoteListWriter.write(writer, Arrays.asList(catchUpResMessage.getPreCommits())); + writer.writeUint256(catchUpResMessage.getBlockHash().getBytes()); + writer.writeUint32(catchUpResMessage.getBlockNumber().longValue()); } } diff --git a/src/main/java/com/limechain/network/protocol/message/ProtocolMessageBuilder.java b/src/main/java/com/limechain/network/protocol/message/ProtocolMessageBuilder.java index 03f181818..4df01f069 100644 --- a/src/main/java/com/limechain/network/protocol/message/ProtocolMessageBuilder.java +++ b/src/main/java/com/limechain/network/protocol/message/ProtocolMessageBuilder.java @@ -2,7 +2,6 @@ import com.limechain.grandpa.state.GrandpaSetState; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; -import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage; import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage; import com.limechain.network.protocol.warp.dto.BlockHeader; import com.limechain.rpc.server.AppBean; @@ -34,16 +33,4 @@ public BlockAnnounceMessage buildBlockAnnounceMessage(BlockHeader blockHeader, b isBestBlock ); } - - public CatchUpReqMessage buildCatchUpRequestMessage() { - GrandpaSetState grandpaSetState = AppBean.getBean(GrandpaSetState.class); - - BigInteger setId = grandpaSetState.getSetId(); - BigInteger roundNumber = grandpaSetState.getRoundCache().getLatestRoundNumber(setId); - - return new CatchUpReqMessage( - roundNumber, - setId - ); - } } diff --git a/src/main/java/com/limechain/rpc/config/CommonConfig.java b/src/main/java/com/limechain/rpc/config/CommonConfig.java index 9d292f1fd..5dec6d7e1 100644 --- a/src/main/java/com/limechain/rpc/config/CommonConfig.java +++ b/src/main/java/com/limechain/rpc/config/CommonConfig.java @@ -86,8 +86,9 @@ public SystemInfo systemInfo(HostConfig hostConfig, NetworkService network, Sync public GrandpaSetState grandpaSetState(RoundCache roundCache, KeyStore keyStore, KVRepository repository, - BlockState blockState) { - return new GrandpaSetState(blockState, roundCache, keyStore, repository); + BlockState blockState, + PeerMessageCoordinator messageCoordinator) { + return new GrandpaSetState(blockState, roundCache, keyStore, repository, messageCoordinator); } @Bean diff --git a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java index 568c8b0af..5342766f1 100644 --- a/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java +++ b/src/main/java/com/limechain/sync/warpsync/WarpSyncState.java @@ -3,7 +3,6 @@ import com.limechain.exception.global.RuntimeCodeException; import com.limechain.exception.trie.TrieDecoderException; import com.limechain.grandpa.state.GrandpaSetState; -import com.limechain.grandpa.state.RoundCache; import com.limechain.network.PeerMessageCoordinator; import com.limechain.network.PeerRequester; import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage; @@ -277,22 +276,6 @@ public void syncNeighbourMessage(NeighbourMessage neighbourMessage, PeerId peerI } } - public void checkAndInitiateCatchUp(NeighbourMessage neighbourMessage, PeerId peerId) { - - GrandpaSetState grandpaSetState = stateManager.getGrandpaSetState(); - // If peer has the same voter set id - if (neighbourMessage.getSetId().equals(grandpaSetState.getSetId())) { - RoundCache roundCache = grandpaSetState.getRoundCache(); - BigInteger latestRound = roundCache.getLatestRoundNumber(grandpaSetState.getSetId()); - - // Check if needed to catch-up peer - if (neighbourMessage.getRound().compareTo(latestRound) > 0) { - log.log(Level.FINE, "Neighbor message indicates that the round of Peer " + peerId + " is ahead."); - messageCoordinator.sendCatchUpRequestToPeer(peerId); - } - } - } - private void updateSetData(BigInteger setChangeBlock) { List response = requester.requestBlockData( diff --git a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java index 4ba1468ea..ff2c1aaf2 100644 --- a/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java +++ b/src/test/java/com/limechain/network/protocol/grandpa/GrandpaEngineTest.java @@ -8,8 +8,8 @@ import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder; import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage; import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessageScaleReader; -import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpMessage; -import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpMessageScaleReader; +import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage; +import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessageScaleReader; import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage; import com.limechain.network.protocol.grandpa.messages.commit.CommitMessageScaleReader; import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage; @@ -272,14 +272,14 @@ void receiveCatchUpRequestMessageOnResponderStreamShouldLogAndIgnore() { @Test void receiveCatchUpResponseMessageOnResponderStreamShouldLogAndIgnore() { byte[] message = new byte[]{4, 2, 3}; - CatchUpMessage catchUpMessage = mock(CatchUpMessage.class); + CatchUpResMessage catchUpResMessage = mock(CatchUpResMessage.class); when(stream.isInitiator()).thenReturn(false); when(stream.remotePeerId()).thenReturn(peerId); when(connectionManager.isGrandpaConnected(peerId)).thenReturn(true); try (MockedConstruction readerMock = mockConstruction(ScaleCodecReader.class, (mock, context) - -> when(mock.read(any(CatchUpMessageScaleReader.class))).thenReturn(catchUpMessage)) + -> when(mock.read(any(CatchUpResMessageScaleReader.class))).thenReturn(catchUpResMessage)) ) { grandpaEngine.receiveRequest(message, stream); diff --git a/src/test/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchupResponseMessageScaleReaderTest.java b/src/test/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchupResponseMessageScaleReaderTest.java index d86483730..af12222ed 100644 --- a/src/test/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchupResponseMessageScaleReaderTest.java +++ b/src/test/java/com/limechain/network/protocol/grandpa/messages/catchup/res/CatchupResponseMessageScaleReaderTest.java @@ -16,10 +16,10 @@ class CatchupResponseMessageScaleReaderTest { void encodeAndDecodeEqualsTest() throws IOException { byte[] expectedByteArr = HexUtils.fromHexString("0401000000000000000100000000000000040a0b0c0d00000000000000000000000000000000000000000000000000000000e7030000010203040000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000506070800000000000000000000000000000000000000000000000000000000040a0b0c0d000000000000000000000000000000000000000000000000000000004d010000010203040000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000506070800000000000000000000000000000000000000000000000000000000700e648a80bf01944ca2a5ae4da4fea86810d02b549d1e399c06eee938b973f101000000"); ScaleCodecReader reader = new ScaleCodecReader(expectedByteArr); - CatchUpMessage commitMessage = reader.read(CatchUpMessageScaleReader.getInstance()); + CatchUpResMessage catchUpResMessage = reader.read(CatchUpResMessageScaleReader.getInstance()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - CatchUpMessageScaleWriter.getInstance().write(new ScaleCodecWriter(baos), commitMessage); + CatchUpResMessageScaleWriter.getInstance().write(new ScaleCodecWriter(baos), catchUpResMessage); assertArrayEquals(expectedByteArr, baos.toByteArray()); }