diff --git a/consensus/reactor.go b/consensus/reactor.go index f0c1937c45..ce5ce90b7c 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -353,8 +353,12 @@ func (conR *Reactor) ReceiveEnvelope(e p2p.Envelope) { case *VoteMessage: cs := conR.conS cs.mtx.RLock() - height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() + height, round, valSize, lastCommitSize := cs.Height, cs.Round, + cs.Validators.Size(), cs.LastCommit.Size() cs.mtx.RUnlock() + + schema.WriteVote(conR.traceClient, height, round, msg.Vote, e.Src.ID(), schema.TransferTypeDownload) + ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) @@ -762,7 +766,7 @@ OUTER_LOOP: // Special catchup logic. // If peer is lagging by height 1, send LastCommit. if prs.Height != 0 && rs.Height == prs.Height+1 { - if ps.PickSendVote(rs.LastCommit) { + if conR.pickSendVoteAndTrace(rs.LastCommit, rs, ps) { logger.Debug("Picked rs.LastCommit to send", "height", prs.Height) continue OUTER_LOOP } @@ -775,8 +779,11 @@ OUTER_LOOP: // Load the block commit for prs.Height, // which contains precommit signatures for prs.Height. if commit := conR.conS.blockStore.LoadBlockCommit(prs.Height); commit != nil { - if ps.PickSendVote(commit) { + vote := ps.PickSendVote(commit) + if vote != nil { logger.Debug("Picked Catchup commit to send", "height", prs.Height) + schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote, + ps.peer.ID(), schema.TransferTypeUpload) continue OUTER_LOOP } } @@ -798,6 +805,18 @@ OUTER_LOOP: } } +// pickSendVoteAndTrace picks a vote to send and traces it. +// It returns true if a vote is sent. +// Note that it is a wrapper around PickSendVote with the addition of tracing the vote. +func (conR *Reactor) pickSendVoteAndTrace(votes types.VoteSetReader, rs *cstypes.RoundState, ps *PeerState) bool { + vote := ps.PickSendVote(votes) + if vote != nil { // if a vote is sent, trace it + schema.WriteVote(conR.traceClient, rs.Height, rs.Round, vote, + ps.peer.ID(), schema.TransferTypeUpload) + return true + } + return false +} func (conR *Reactor) gossipVotesForHeight( logger log.Logger, rs *cstypes.RoundState, @@ -807,7 +826,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are lastCommits to send... if prs.Step == cstypes.RoundStepNewHeight { - if ps.PickSendVote(rs.LastCommit) { + if conR.pickSendVoteAndTrace(rs.LastCommit, rs, ps) { logger.Debug("Picked rs.LastCommit to send") return true } @@ -815,7 +834,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are POL prevotes to send... if prs.Step <= cstypes.RoundStepPropose && prs.Round != -1 && prs.Round <= rs.Round && prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { + if conR.pickSendVoteAndTrace(polPrevotes, rs, ps) { logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) return true @@ -824,21 +843,21 @@ func (conR *Reactor) gossipVotesForHeight( } // If there are prevotes to send... if prs.Step <= cstypes.RoundStepPrevoteWait && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + if conR.pickSendVoteAndTrace(rs.Votes.Prevotes(prs.Round), rs, ps) { logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) return true } } // If there are precommits to send... if prs.Step <= cstypes.RoundStepPrecommitWait && prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Precommits(prs.Round)) { + if conR.pickSendVoteAndTrace(rs.Votes.Precommits(prs.Round), rs, ps) { logger.Debug("Picked rs.Precommits(prs.Round) to send", "round", prs.Round) return true } } // If there are prevotes to send...Needed because of validBlock mechanism if prs.Round != -1 && prs.Round <= rs.Round { - if ps.PickSendVote(rs.Votes.Prevotes(prs.Round)) { + if conR.pickSendVoteAndTrace(rs.Votes.Prevotes(prs.Round), rs, ps) { logger.Debug("Picked rs.Prevotes(prs.Round) to send", "round", prs.Round) return true } @@ -846,7 +865,7 @@ func (conR *Reactor) gossipVotesForHeight( // If there are POLPrevotes to send... if prs.ProposalPOLRound != -1 { if polPrevotes := rs.Votes.Prevotes(prs.ProposalPOLRound); polPrevotes != nil { - if ps.PickSendVote(polPrevotes) { + if conR.pickSendVoteAndTrace(polPrevotes, rs, ps) { logger.Debug("Picked rs.Prevotes(prs.ProposalPOLRound) to send", "round", prs.ProposalPOLRound) return true @@ -1163,8 +1182,8 @@ func (ps *PeerState) SetHasProposalBlockPart(height int64, round int32, index in } // PickSendVote picks a vote and sends it to the peer. -// Returns true if vote was sent. -func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { +// Returns the vote if vote was sent. Otherwise, returns nil. +func (ps *PeerState) PickSendVote(votes types.VoteSetReader) *types.Vote { if vote, ok := ps.PickVoteToSend(votes); ok { ps.logger.Debug("Sending vote message", "ps", ps, "vote", vote) if p2p.SendEnvelopeShim(ps.peer, p2p.Envelope{ //nolint: staticcheck @@ -1174,11 +1193,11 @@ func (ps *PeerState) PickSendVote(votes types.VoteSetReader) bool { }, }, ps.logger) { ps.SetHasVote(vote) - return true + return vote } - return false + return nil } - return false + return nil } // PickVoteToSend picks a vote to send to the peer. diff --git a/pkg/trace/schema/consensus.go b/pkg/trace/schema/consensus.go index 3fea523853..e22b658c32 100644 --- a/pkg/trace/schema/consensus.go +++ b/pkg/trace/schema/consensus.go @@ -14,6 +14,7 @@ func ConsensusTables() []string { RoundStateTable, BlockPartsTable, BlockTable, + VoteTable, } } @@ -128,3 +129,54 @@ func WriteBlock(client *trace.Client, block *types.Block, size int) { LastCommitRoundFieldKey: block.LastCommit.Round, }) } + +// Schema constants for the consensus votes tracing database. +const ( + // VoteTable is the name of the table that stores the consensus + // voting traces. Follows this schema: + // + // | time | height | round | vote_type | vote_height | vote_round + // | vote_block_id| vote_unix_millisecond_timestamp + // | vote_validator_address | vote_validator_index | peer + // | transfer_type | + VoteTable = "consensus_vote" + + VoteTypeFieldKey = "vote_type" + VoteHeightFieldKey = "vote_height" + VoteRoundFieldKey = "vote_round" + VoteBlockIDFieldKey = "vote_block_id" + VoteTimestampFieldKey = "vote_unix_millisecond_timestamp" + ValidatorAddressFieldKey = "vote_validator_address" + ValidatorIndexFieldKey = "vote_validator_index" +) + +// WriteVote writes a tracing point for a vote using the predetermined +// schema for consensus vote tracing. +// This is used to create a table in the following +// schema: +// +// | time | height | round | vote_type | vote_height | vote_round +// | vote_block_id| vote_unix_millisecond_timestamp +// | vote_validator_address | vote_validator_index | peer +// | transfer_type | +func WriteVote(client *trace.Client, + height int64, // height of the current peer when it received/sent the vote + round int32, // round of the current peer when it received/sent the vote + vote *types.Vote, // vote received by the current peer + peer p2p.ID, // the peer from which it received the vote or the peer to which it sent the vote + transferType string, // download (received) or upload(sent) +) { + client.WritePoint(VoteTable, map[string]interface{}{ + HeightFieldKey: height, + RoundFieldKey: round, + VoteTypeFieldKey: vote.Type.String(), + VoteHeightFieldKey: vote.Height, + VoteRoundFieldKey: vote.Round, + VoteBlockIDFieldKey: vote.BlockID.Hash.String(), + VoteTimestampFieldKey: vote.Timestamp.UnixMilli(), + ValidatorAddressFieldKey: vote.ValidatorAddress.String(), + ValidatorIndexFieldKey: vote.ValidatorIndex, + PeerFieldKey: peer, + TransferTypeFieldKey: transferType, + }) +} diff --git a/pkg/trace/schema/tables.go b/pkg/trace/schema/tables.go index 2c8c9ef97d..11b106e02c 100644 --- a/pkg/trace/schema/tables.go +++ b/pkg/trace/schema/tables.go @@ -20,7 +20,8 @@ const ( // value. PeerFieldKey = "peer" - // TransferTypeFieldKey is the tracing field key for the class of a tx. + // TransferTypeFieldKey is the tracing field key for the class of a tx + // and votes. TransferTypeFieldKey = "transfer_type" // TransferTypeDownload is a tracing field value for receiving some