Skip to content

Commit

Permalink
403 process catchup request (#722)
Browse files Browse the repository at this point in the history
# Description

- Implemented handling of catch up requests and introduced last
finalized round reference to GrandpaSetState as per spec (It will be
used in Play Grandpa Round and Process Catch-up Responses, too)
- Added catch up threshold when sending catch up request as per spec
- Extracted handling and sending of catch up requests to
GrandpaSetState.java

Fixes #403 >

---------

Co-authored-by: Hristiyan Mitov <[email protected]>
  • Loading branch information
hMitov and Hristiyan Mitov authored Jan 29, 2025
1 parent b83e55f commit eae0c4f
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 114 deletions.
2 changes: 2 additions & 0 deletions src/main/java/com/limechain/grandpa/state/GrandpaRound.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.limechain.network.protocol.grandpa.messages.catchup.res.SignedVote;
import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage;
import com.limechain.network.protocol.grandpa.messages.commit.Vote;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import io.emeraldpay.polkaj.types.Hash256;
import lombok.Getter;
import lombok.Setter;
Expand All @@ -29,6 +30,7 @@ public class GrandpaRound implements Serializable {
private Map<Hash256, SignedVote> preVotes = new ConcurrentHashMap<>();
private Map<Hash256, SignedVote> preCommits = new ConcurrentHashMap<>();
private SignedVote primaryVote;
private BlockHeader finalizedBlockHeader;

private Map<Hash256, Set<SignedVote>> pvEquivocations = new ConcurrentHashMap<>();
private Map<Hash256, Set<SignedVote>> pcEquivocations = new ConcurrentHashMap<>();
Expand Down
82 changes: 82 additions & 0 deletions src/main/java/com/limechain/grandpa/state/GrandpaSetState.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
import com.limechain.chain.lightsyncstate.Authority;
import com.limechain.chain.lightsyncstate.LightSyncState;
import com.limechain.exception.grandpa.GrandpaGenericException;
import com.limechain.network.PeerMessageCoordinator;
import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage;
import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage;
import com.limechain.network.protocol.grandpa.messages.catchup.res.SignedVote;
import com.limechain.network.protocol.grandpa.messages.commit.Vote;
import com.limechain.network.protocol.grandpa.messages.consensus.GrandpaConsensusMessage;
import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage;
import com.limechain.network.protocol.grandpa.messages.vote.SignedMessage;
import com.limechain.network.protocol.grandpa.messages.vote.Subround;
import com.limechain.network.protocol.grandpa.messages.vote.VoteMessage;
import com.limechain.network.protocol.warp.dto.BlockHeader;
import com.limechain.runtime.Runtime;
import com.limechain.state.AbstractState;
import com.limechain.storage.DBConstants;
Expand All @@ -22,6 +27,7 @@
import com.limechain.sync.warpsync.dto.ForcedAuthoritySetChange;
import com.limechain.sync.warpsync.dto.ScheduledAuthoritySetChange;
import io.emeraldpay.polkaj.types.Hash256;
import io.libp2p.core.PeerId;
import io.libp2p.core.crypto.PubKey;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -37,6 +43,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.function.Supplier;
import java.util.logging.Level;

/**
Expand All @@ -52,6 +60,7 @@
public class GrandpaSetState extends AbstractState implements ServiceConsensusState {

private static final BigInteger THRESHOLD_DENOMINATOR = BigInteger.valueOf(3);
private static final BigInteger CATCH_UP_THRESHOLD = BigInteger.TWO;

private List<Authority> authorities;
private BigInteger disabledAuthority;
Expand All @@ -61,6 +70,7 @@ public class GrandpaSetState extends AbstractState implements ServiceConsensusSt
private final RoundCache roundCache;
private final KeyStore keyStore;
private final KVRepository<String, Object> repository;
private final PeerMessageCoordinator messageCoordinator;

private final PriorityQueue<AuthoritySetChange> authoritySetChanges =
new PriorityQueue<>(AuthoritySetChange.getComparator());
Expand Down Expand Up @@ -272,4 +282,76 @@ private void updateAuthorityStatus() {

keyPair.ifPresentOrElse(AbstractState::setAuthorityStatus, AbstractState::clearAuthorityStatus);
}

public void initiateAndSendCatchUpRequest(NeighbourMessage neighbourMessage, PeerId peerId) {
// If peer has the same voter set id
if (neighbourMessage.getSetId().equals(setId)) {

// Check if needed to catch-up peer
if (neighbourMessage.getRound().compareTo(
fetchLatestRound().getRoundNumber().add(CATCH_UP_THRESHOLD)) >= 0) {
log.log(Level.FINE, "Neighbor message indicates that the round of Peer " + peerId + " is ahead.");

CatchUpReqMessage catchUpReqMessage = CatchUpReqMessage.builder()
.round(neighbourMessage.getRound())
.setId(neighbourMessage.getSetId()).build();

messageCoordinator.sendCatchUpRequestToPeer(peerId, catchUpReqMessage);
}
}
}

public void initiateAndSendCatchUpResponse(PeerId peerId,
CatchUpReqMessage catchUpReqMessage,
Supplier<Set<PeerId>> peerIds) {

if (!peerIds.get().contains(peerId)) {
throw new GrandpaGenericException("Requesting catching up from a non-peer.");
}

if (!catchUpReqMessage.getSetId().equals(setId)) {
throw new GrandpaGenericException("Catch up message has a different setId.");
}

if (catchUpReqMessage.getRound().compareTo(fetchLatestRound().getRoundNumber()) > 0) {
throw new GrandpaGenericException("Catching up on a round in the future.");
}

GrandpaRound selectedGrandpaRound = selectRound(catchUpReqMessage.getRound(), catchUpReqMessage.getSetId())
.orElseThrow(() -> new GrandpaGenericException("Target round was no found."));

SignedVote[] preCommits = selectedGrandpaRound.getPreCommits().values().toArray(SignedVote[]::new);
SignedVote[] preVotes = selectedGrandpaRound.getPreVotes().values().toArray(SignedVote[]::new);

BlockHeader finalizedBlockHeader = selectedGrandpaRound.getFinalizedBlockHeader();

CatchUpResMessage catchUpResMessage = CatchUpResMessage.builder()
.round(selectedGrandpaRound.getRoundNumber())
.setId(setId)
.preCommits(preCommits)
.preVotes(preVotes)
.blockHash(finalizedBlockHeader.getHash())
.blockNumber(finalizedBlockHeader.getBlockNumber())
.build();

messageCoordinator.sendCatchUpResponseToPeer(peerId, catchUpResMessage);
}

private Optional<GrandpaRound> selectRound(BigInteger peerRoundNumber, BigInteger peerSetId) {
GrandpaRound round = roundCache.getLatestRound(setId);

while (round != null) {
// Round found
// Check voter set
if (round.getRoundNumber().equals(peerRoundNumber)) {
if (round.getCommitMessagesArchive().getFirst().getSetId().equals(peerSetId)) {
break;
}
}
// Go to the previous round
round = round.getPrevious();
}

return Optional.ofNullable(round);
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/limechain/network/ConnectionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.limechain.network.dto.PeerInfo;
import com.limechain.network.dto.ProtocolStreamType;
import com.limechain.network.dto.ProtocolStreams;
import com.limechain.network.protocol.blockannounce.NodeRole;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshake;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
import com.limechain.network.protocol.warp.dto.BlockHeader;
Expand Down Expand Up @@ -252,4 +253,8 @@ private void closeProtocolStream(final ProtocolStreams streams) {
streams.getResponder().close();
}
}

public boolean checkIfPeerIsAuthorNode(PeerId peerId) {
return NodeRole.AUTHORING.getValue().equals(getPeerInfo(peerId).getNodeRole());
}
}
14 changes: 12 additions & 2 deletions src/main/java/com/limechain/network/PeerMessageCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import com.limechain.network.protocol.blockannounce.NodeRole;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceMessage;
import com.limechain.network.protocol.blockannounce.scale.BlockAnnounceMessageScaleWriter;
import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage;
import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessageScaleWriter;
import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage;
import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessageScaleWriter;
import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage;
import com.limechain.network.protocol.grandpa.messages.commit.CommitMessageScaleWriter;
import com.limechain.network.protocol.grandpa.messages.vote.VoteMessage;
Expand Down Expand Up @@ -113,8 +117,14 @@ public void sendCommitMessageToPeers(CommitMessage commitMessage) {
});
}

public void sendCatchUpRequestToPeer(PeerId peerId) {
network.getGrandpaService().sendCatchUpRequest(network.getHost(), peerId);
public void sendCatchUpRequestToPeer(PeerId peerId, CatchUpReqMessage catchUpReqMessage) {
byte[] scaleMessage = ScaleUtils.Encode.encode(CatchUpReqMessageScaleWriter.getInstance(), catchUpReqMessage);
network.getGrandpaService().sendCatchUpRequest(network.getHost(), peerId, scaleMessage);
}

public void sendCatchUpResponseToPeer(PeerId peerId, CatchUpResMessage catchUpResMessage) {
byte[] scaleMessage = ScaleUtils.Encode.encode(CatchUpResMessageScaleWriter.getInstance(), catchUpResMessage);
network.getGrandpaService().sendCatchUpResponse(network.getHost(), peerId, scaleMessage);
}

public void sendVoteMessageToPeers(VoteMessage voteMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ public void sendCommitMessage(byte[] encodedCommitMessage) {
/**
* Sends a catch-up request message over the controller stream.
*/
public void sendCatchUpRequest() {
engine.writeCatchUpRequest(stream, stream.remotePeerId());
public void sendCatchUpRequest(byte[] encodedCatchUpReqMessage) {
engine.writeCatchUpRequest(stream, encodedCatchUpReqMessage);
}

/**
* Sends a catch-up response message over the controller stream.
*/
public void sendCatchUpResponse(byte[] encodedCatchUpResMessage) {
engine.writeCatchUpResponse(stream, encodedCatchUpResMessage);
}

/**
Expand All @@ -47,5 +54,4 @@ public void sendCatchUpRequest() {
public void sendVoteMessage(byte[] encodedVoteMessage) {
engine.writeCommitMessage(stream, encodedVoteMessage);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
import com.limechain.grandpa.GrandpaService;
import com.limechain.grandpa.state.GrandpaSetState;
import com.limechain.network.ConnectionManager;
import com.limechain.network.protocol.blockannounce.NodeRole;
import com.limechain.network.protocol.blockannounce.messages.BlockAnnounceHandshakeBuilder;
import com.limechain.network.protocol.grandpa.messages.GrandpaMessageType;
import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessage;
import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessageScaleReader;
import com.limechain.network.protocol.grandpa.messages.catchup.req.CatchUpReqMessageScaleWriter;
import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpMessage;
import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpMessageScaleReader;
import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessage;
import com.limechain.network.protocol.grandpa.messages.catchup.res.CatchUpResMessageScaleReader;
import com.limechain.network.protocol.grandpa.messages.commit.CommitMessage;
import com.limechain.network.protocol.grandpa.messages.commit.CommitMessageScaleReader;
import com.limechain.network.protocol.grandpa.messages.neighbour.NeighbourMessage;
Expand Down Expand Up @@ -151,8 +149,8 @@ private void handleNeighbourMessage(byte[] message, PeerId peerId) {
log.log(Level.FINE, "Received neighbour message from Peer " + peerId + "\n" + neighbourMessage);
new Thread(() -> warpSyncState.syncNeighbourMessage(neighbourMessage, peerId)).start();

if (NodeRole.AUTHORING.getValue().equals(connectionManager.getPeerInfo(peerId).getNodeRole())) {
warpSyncState.checkAndInitiateCatchUp(neighbourMessage, peerId);
if (AbstractState.isActiveAuthority() && connectionManager.checkIfPeerIsAuthorNode(peerId)) {
grandpaSetState.initiateAndSendCatchUpRequest(neighbourMessage, peerId);
}
}

Expand All @@ -174,15 +172,18 @@ private void handleCommitMessage(byte[] message, PeerId peerId) {
private void handleCatchupRequestMessage(byte[] message, PeerId peerId) {
ScaleCodecReader reader = new ScaleCodecReader(message);
CatchUpReqMessage catchUpReqMessage = reader.read(CatchUpReqMessageScaleReader.getInstance());
//todo: handle catchup req message (authoring node responsibility)
log.log(Level.INFO, "Received catch up request message from Peer " + peerId + "\n" + catchUpReqMessage);

if (AbstractState.isActiveAuthority() && connectionManager.checkIfPeerIsAuthorNode(peerId)) {
grandpaSetState.initiateAndSendCatchUpResponse(peerId, catchUpReqMessage, connectionManager::getPeerIds);
}
}

private void handleCatchupResponseMessage(byte[] message, PeerId peerId) {
ScaleCodecReader reader = new ScaleCodecReader(message);
CatchUpMessage catchUpMessage = reader.read(CatchUpMessageScaleReader.getInstance());
CatchUpResMessage catchUpResMessage = reader.read(CatchUpResMessageScaleReader.getInstance());
//todo: handle catchup res message (authoring node responsibility)
log.log(Level.INFO, "Received catch up message from Peer " + peerId + "\n" + catchUpMessage);
log.log(Level.INFO, "Received catch up response message from Peer " + peerId + "\n" + catchUpResMessage);
}

/**
Expand Down Expand Up @@ -214,7 +215,7 @@ public void writeNeighbourMessage(Stream stream, PeerId peerId) {
throw new ScaleEncodingException(e);
}

log.log(Level.FINE, "Sending neighbour message to peer " + peerId);
log.log(Level.FINE, "Sending neighbour message to Peer " + peerId);
stream.writeAndFlush(buf.toByteArray());
}

Expand All @@ -225,29 +226,30 @@ public void writeNeighbourMessage(Stream stream, PeerId peerId) {
* @param encodedCommitMessage scale encoded CommitMessage object
*/
public void writeCommitMessage(Stream stream, byte[] encodedCommitMessage) {
log.log(Level.FINE, "Sending commit message to peer " + stream.remotePeerId());
log.log(Level.FINE, "Sending commit message to Peer " + stream.remotePeerId());
stream.writeAndFlush(encodedCommitMessage);
}

/**
* Send our GRANDPA catch-up request message on a given <b>responder</b> stream.
*
* @param stream <b>responder</b> stream to write the message to
* @param peerId peer to send to
* @param stream <b>responder</b> stream to write the message to
* @param encodedCatchUpReqMessage scale encoded CatchUpRequestMessage object
*/
public void writeCatchUpRequest(Stream stream, PeerId peerId) {
ByteArrayOutputStream buf = new ByteArrayOutputStream();
try (ScaleCodecWriter writer = new ScaleCodecWriter(buf)) {
writer.write(
CatchUpReqMessageScaleWriter.getInstance(),
ProtocolMessageBuilder.buildCatchUpRequestMessage()
);
} catch (IOException e) {
throw new ScaleEncodingException(e);
}
public void writeCatchUpRequest(Stream stream, byte[] encodedCatchUpReqMessage) {
log.log(Level.FINE, "Sending catch up request to Peer " + stream.remotePeerId());
stream.writeAndFlush(encodedCatchUpReqMessage);
}

log.log(Level.FINE, "Sending catch up request to Peer " + peerId);
stream.writeAndFlush(buf.toByteArray());
/**
* Send our GRANDPA catch-up response message on a given <b>responder</b> stream.
*
* @param stream <b>responder</b> stream to write the message to
* @param encodedCatchUpResMessage scale encoded CatchUpResMessage object
*/
public void writeCatchUpResponse(Stream stream, byte[] encodedCatchUpResMessage) {
log.log(Level.FINE, "Sending catch up response to Peer " + stream.remotePeerId());
stream.writeAndFlush(encodedCatchUpResMessage);
}

/**
Expand Down
Loading

0 comments on commit eae0c4f

Please sign in to comment.