Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype: delayed precommit #1431

Draft
wants to merge 54 commits into
base: v0.34.x-celestia
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
5a90ec5
delays precommits
staheri14 Jul 25, 2024
86b556d
calculates delay based on 11s after start time
staheri14 Jul 26, 2024
7a727d7
minor replacement
staheri14 Jul 26, 2024
9ee8842
Merge branch 'v0.34.x-celestia' into sanaz/delayed-precommit
staheri14 Jul 26, 2024
710b5c9
increases timeout in the test to account for the delayed precommit
staheri14 Jul 26, 2024
064b55b
Merge branch 'v0.34.x-celestia' into sanaz/delayed-precommit
staheri14 Jul 26, 2024
691fd4e
increases the waittime based on the expected block time
staheri14 Jul 26, 2024
6b9cdf5
fixes the TestProvider
staheri14 Jul 26, 2024
b4c184b
increases waittime for TestApp_Tx
staheri14 Jul 26, 2024
6120f68
increases to block time
staheri14 Jul 26, 2024
8d67ff1
fixes failure in TestMempoolNoProgressUntilTxsAvailable
staheri14 Jul 26, 2024
c009f22
fixes TestHeaderEvents
staheri14 Jul 26, 2024
91a0bd5
adds a todo
staheri14 Jul 26, 2024
c1f8105
adds a todo
staheri14 Jul 26, 2024
ed4bd1c
adds todos and reminders
staheri14 Jul 26, 2024
2598052
adds a todo to fix TestBroadcastTx
staheri14 Jul 26, 2024
9d6925e
extends contex timeout to make the TestBroadcastTx pass
staheri14 Jul 26, 2024
d44bb3b
increases TimeoutBroadcastTxCommit
staheri14 Jul 26, 2024
35f921b
fixes TestNodeStartStop by taking the new block time into account
staheri14 Jul 26, 2024
39ace76
fixes TestReactorVotingPowerChange
staheri14 Jul 26, 2024
5ddeaf3
accounts for block time variance in TestApp_Tx
staheri14 Jul 29, 2024
7ca9f7c
clarifies comments about TimeoutBroadcastTxCommit
staheri14 Jul 29, 2024
65e5442
sets wait time higher for the sake of CIs
staheri14 Jul 29, 2024
8a90067
adjust waiting time if higher than 11s
staheri14 Jul 29, 2024
a7468f6
traces precommit time table
staheri14 Jul 31, 2024
8095e52
adds PRecommitTimeTable to the consensus tables
staheri14 Jul 31, 2024
de810b1
adds consensus prefix to the table name
staheri14 Jul 31, 2024
c544f8a
does not adjust wait time
staheri14 Jul 31, 2024
b73fb1a
adds a comment
staheri14 Aug 6, 2024
9b7daab
captures the time where a new proposal arrives
staheri14 Aug 6, 2024
35a0e3e
traces when a height starts by cs.StartTime or by the arrival of a ne…
staheri14 Aug 7, 2024
fd04b4f
traces the start time of each height
staheri14 Aug 8, 2024
c44cdc4
adds an alternative implementation
staheri14 Aug 9, 2024
05ac22e
traces the proposer
staheri14 Aug 21, 2024
328db7c
Merge remote-tracking branch 'origin/v0.34.x-celestia' into sanaz/del…
staheri14 Aug 21, 2024
d5037d6
traces precommit time in the new implementation
staheri14 Aug 21, 2024
ab97854
updates go version to 1.22.6
staheri14 Aug 26, 2024
3d130ac
Merge branch 'sanaz/bumps-to-go1.22.6' into sanaz/delayed-precommit
staheri14 Aug 26, 2024
8e974ca
increases delay amount to 14s
staheri14 Aug 27, 2024
a7c5cdf
Merge remote-tracking branch 'origin/v0.34.x-celestia' into sanaz/del…
staheri14 Aug 27, 2024
19fa595
increases TimeoutBroadcastTxCommit
staheri14 Aug 27, 2024
5bdedc8
sets delay to 4 s higher than timeout propose
staheri14 Aug 30, 2024
af3a91f
accounts for block propagation time
staheri14 Aug 30, 2024
e0ae827
adds the ability to read delay from env var
staheri14 Aug 30, 2024
be9a1e1
adds log
staheri14 Aug 30, 2024
6bb8a76
removes the debugging logs
staheri14 Aug 31, 2024
8a52745
adds another log
staheri14 Aug 31, 2024
2d66356
fixes duplicate conversion to seconds
staheri14 Aug 31, 2024
cfaf71e
minor update
staheri14 Aug 31, 2024
535046e
deletes stale logs and comments
staheri14 Aug 31, 2024
dd2274c
adds a log about the amount of precommit delay
staheri14 Aug 31, 2024
69dc0fd
traces validator address in earlier steps
staheri14 Sep 9, 2024
45de508
logs when the proposer is traced
staheri14 Sep 10, 2024
9198700
traces both validator's address and the proposer address
staheri14 Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,9 @@ func DefaultRPCConfig() *RPCConfig {
MaxSubscriptionClients: 100,
MaxSubscriptionsPerClient: 5,
SubscriptionBufferSize: defaultSubscriptionBufferSize,
TimeoutBroadcastTxCommit: 10 * time.Second,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,
// this value is changed to align with the new consistent block time of 12ish seconds + 3 seconds block time variance (15 seconds)
TimeoutBroadcastTxCommit: 20 * time.Second,
WebSocketWriteBufferSize: defaultSubscriptionBufferSize,

MaxBodyBytes: int64(1000000), // 1MB
MaxHeaderBytes: 1 << 20, // same as the net/http default
Expand Down
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
assert.Equal(t, prevoteHeight, ev.Height())
}
}
case <-time.After(20 * time.Second):
case <-time.After(1 * time.Minute):
for i, reactor := range reactors {
t.Logf("Consensus Reactor %d\n%v", i, reactor)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type cleanupFunc func()
var (
config *cfg.Config // NOTE: must be reset for each _test.go file
consensusReplayConfig *cfg.Config
ensureTimeout = time.Millisecond * 200
ensureTimeout = time.Second * 12 // time.Millisecond * 200
)

func ensureDir(dir string, mode os.FileMode) {
Expand Down
18 changes: 18 additions & 0 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package consensus
import (
"errors"
"fmt"
blog "log"
"os"
"reflect"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -36,6 +39,11 @@ const (
votesToContributeToBecomeGoodPeer = 10000
)

var (
stateKey = []byte("stateKey")
PrecommitDelay = 11 * time.Second
)

//-----------------------------------------------------------------------------

// Reactor defines a reactor for the consensus service.
Expand Down Expand Up @@ -1712,6 +1720,16 @@ func init() {
cmtjson.RegisterType(&HasVoteMessage{}, "tendermint/HasVote")
cmtjson.RegisterType(&VoteSetMaj23Message{}, "tendermint/VoteSetMaj23")
cmtjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits")

delay := os.Getenv("PRECOMMIT_DELAY")
if delay != "" {
parsedDelay, err := strconv.Atoi(delay)
if err == nil {
PrecommitDelay = time.Duration(parsedDelay) * time.Second
}
}
blog.Println("Precommit delay is set to",
PrecommitDelay.Seconds(), " seconds")
}

//-------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions consensus/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,9 +709,9 @@ func timeoutWaitGroup(t *testing.T, n int, f func(int), css []*State) {
close(done)
}()

// we're running many nodes in-process, possibly in in a virtual machine,
// we're running many nodes in-process, possibly in a virtual machine,
// and spewing debug messages - making a block could take a while,
timeout := time.Second * 120
timeout := (12 * time.Second) * 120

select {
case <-done:
Expand Down
63 changes: 56 additions & 7 deletions consensus/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,17 @@ type State struct {
privValidatorPubKey crypto.PubKey

// state changes may be triggered by: msgs from peers,
// msgs from ourself, or by timeouts
// msgs from ourselves, or by timeouts
peerMsgQueue chan msgInfo
internalMsgQueue chan msgInfo
timeoutTicker TimeoutTicker

// information about about added votes and block parts are written on this channel
// information about added votes and block parts are written on this channel
// so statistics can be computed by reactor
statsMsgQueue chan msgInfo

// we use eventBus to trigger msg broadcasts in the reactor,
// and to notify external subscribers, eg. through a websocket
// and to notify external subscribers, e.g., through a websocket
eventBus *types.EventBus

// a Write-Ahead Log ensures we can recover from any kind of crash
Expand Down Expand Up @@ -842,7 +842,7 @@ func (cs *State) handleMsg(mi msgInfo) {
// if the proposal is complete, we'll enterPrevote or tryFinalizeCommit
added, err = cs.addProposalBlockPart(msg, peerID)

// We unlock here to yield to any routines that need to read the the RoundState.
// We unlock here to yield to any routines that need to read the RoundState.
// Previously, this code held the lock from the point at which the final block
// part was received until the block executed against the application.
// This prevented the reactor from being able to retrieve the most updated
Expand Down Expand Up @@ -915,6 +915,10 @@ func (cs *State) handleMsg(mi msgInfo) {

func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
cs.Logger.Debug("received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step)
if ti.Height == rs.Height {
schema.WriteRoundState(cs.traceClient, ti.Height, ti.Round,
schema.StartTimeIsReached)
}

// timeouts must be for current height, round, step
if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) {
Expand Down Expand Up @@ -1010,6 +1014,11 @@ func (cs *State) enterNewRound(height int64, round int32) {
return
}

if cs.Round == round && cs.Step == cstypes.RoundStepNewHeight {
schema.WriteRoundState(cs.traceClient, cs.Height, cs.Round,
schema.NewHeightByStartTime)
}

if now := cmttime.Now(); cs.StartTime.After(now) {
logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now)
}
Expand Down Expand Up @@ -1099,12 +1108,13 @@ func (cs *State) enterPropose(height int64, round int32) {
return
}

logger.Debug("entering propose step", "current", log.NewLazySprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
logger.Info("entering propose step", "current",
log.NewLazySprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))

defer func() {
// Done enterPropose:
cs.updateRoundStep(round, cstypes.RoundStepPropose)
cs.newStep()
cs.newStep() // announce the new step

// If we have the whole proposal + POL, then goto Prevote now.
// else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart),
Expand Down Expand Up @@ -1133,6 +1143,9 @@ func (cs *State) enterPropose(height int64, round int32) {
}

address := cs.privValidatorPubKey.Address()
logger.Info("attempting to trace proposer", "addr", address.String())
schema.WriteProposer(cs.traceClient, height, round, address.String(),
cs.Validators.GetProposer().Address.String())

// if not a validator, we're done
if !cs.Validators.HasAddress(address) {
Expand All @@ -1141,6 +1154,7 @@ func (cs *State) enterPropose(height int64, round int32) {
}

if cs.isProposer(address) {
// trace if the node is the proposer
logger.Debug("propose step; our turn to propose", "proposer", address)
cs.decideProposal(height, round)
} else {
Expand Down Expand Up @@ -1255,7 +1269,7 @@ func (cs *State) createProposalBlock() (block *types.Block, blockParts *types.Pa
// Enter: `timeoutPropose` after entering Propose.
// Enter: proposal block and POL is ready.
// Prevote for LockedBlock if we're locked, or ProposalBlock if valid.
// Otherwise vote nil.
// Otherwise, vote nil.
func (cs *State) enterPrevote(height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round)

Expand Down Expand Up @@ -1366,6 +1380,15 @@ func (cs *State) enterPrevoteWait(height int64, round int32) {
cs.scheduleTimeout(cs.config.Prevote(round), height, round, cstypes.RoundStepPrevoteWait)
}

// isReadyToPrecommit calculates if the process has waited at least 11 seconds
// from their start time before they can vote
func (cs *State) isReadyToPrecommit(height int64, round int32) (bool, time.Duration) {
precommitVoteTime := cs.StartTime.Add(PrecommitDelay)
waitTime := time.Until(precommitVoteTime)
schema.WritePrecommitTime(cs.traceClient, height, round, waitTime.Seconds())
return waitTime <= 0, waitTime
}

// Enter: `timeoutPrevote` after any +2/3 prevotes.
// Enter: `timeoutPrecommit` after any +2/3 precommits.
// Enter: +2/3 precomits for block or nil.
Expand All @@ -1383,6 +1406,12 @@ func (cs *State) enterPrecommit(height int64, round int32) {
return
}

if ready, waitTime := cs.isReadyToPrecommit(height, round); !ready {
// this will reenter precommit after the waitTime
cs.scheduleTimeout(waitTime, height, round, cstypes.RoundStepPrevoteWait)
return
}

logger.Debug("entering precommit step", "current", log.NewLazySprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))

defer func() {
Expand Down Expand Up @@ -1898,6 +1927,7 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error {

proposal.Signature = p.Signature
cs.Proposal = proposal
schema.WriteRoundState(cs.traceClient, cs.Height, cs.Round, schema.NewProposalArrived)
// We don't update cs.ProposalBlockParts if it is already set.
// This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round.
// TODO: We can check if Proposal is for a different block as this is a sign of misbehavior!
Expand Down Expand Up @@ -2004,6 +2034,7 @@ func (cs *State) handleCompleteProposal(blockHeight int64) {
// procedure at this point.
}

// I think this happens before the cs.StartTime starts.
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(blockHeight, cs.Round)
Expand Down Expand Up @@ -2319,6 +2350,24 @@ func (cs *State) signAddVote(msgType cmtproto.SignedMsgType, hash []byte, header
return nil
}

//if msgType == cmtproto.PrecommitType {
// targetBlockTime := 11 * time.Second
// precommitVoteTime := cs.StartTime.Add(targetBlockTime)
// waitTime := precommitVoteTime.Sub(cmttime.Now())
// schema.WritePrecommitTime(cs.traceClient, cs.Height, cs.Round, waitTime.Seconds())
// if waitTime > 0 {
// //if waitTime > 11*time.Second {
// // cs.Logger.Debug("waiting for precommit vote was higher than"+
// // " expected", "height", cs.Height, "round", cs.Round,
// // "waitTime", waitTime)
// // time.Sleep(11 * time.Second)
// //} else {
// // time.Sleep(waitTime)
// //}
// time.Sleep(waitTime)
//
// }
//}
// TODO: pass pubKey to signVote
vote, err := cs.signVote(msgType, hash, header)
if err == nil {
Expand Down
8 changes: 7 additions & 1 deletion light/provider/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ func TestProvider(t *testing.T) {
require.Nil(t, lb)
assert.Equal(t, provider.ErrHeightTooHigh, err)

_, err = p.LightBlock(context.Background(), 1)
// when we reach here, height 1 is expected to be pruned
// however, due to the delayed precommit logic which makes the node to
// sleep to match the 12s block time,
// it seems the pruning does not happen immediately,
// so adding a sleep here to see if that fixes the issue
time.Sleep(12 * 10 * time.Second)
lb, err = p.LightBlock(context.Background(), 1)
require.Error(t, err)
require.Nil(t, lb)
assert.Equal(t, provider.ErrLightBlockNotFound, err)
Expand Down
3 changes: 2 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ func TestNodeStartStop(t *testing.T) {
// wait for the node to produce a block
blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock)
require.NoError(t, err)
blockTime := 12 * time.Second
select {
case <-blocksSub.Out():
case <-blocksSub.Cancelled():
t.Fatal("blocksSub was cancelled")
case <-time.After(10 * time.Second):
case <-time.After(blockTime):
t.Fatal("timed out waiting for the node to produce a block")
}

Expand Down
47 changes: 46 additions & 1 deletion pkg/trace/schema/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,59 @@ func ConsensusTables() []string {
VoteTable,
ConsensusStateTable,
ProposalTable,
PrecommitTimeTable,
Proposer,
}
}

const (
// PrecommitTimeTable
PrecommitTimeTable = "consensus_precommit_time"
)

type PrecommitTime struct {
Height int64 `json:"height"`
Round int32 `json:"round"`
// amount of delay added before casting precommit for this Height and Round
Delay float64 `json:"delay"`
}

func (p PrecommitTime) Table() string {
return PrecommitTimeTable
}

func WritePrecommitTime(client trace.Tracer, height int64, round int32, delay float64) {
client.Write(PrecommitTime{Height: height, Round: round, Delay: delay})

}

const Proposer = "consensus_proposer"

type ProposerInfo struct {
Height int64 `json:"height"`
Round int32 `json:"round"`
Self string `json:"self"`
Proposer string `json:"address"`
}

func (p ProposerInfo) Table() string {
return Proposer

}

func WriteProposer(client trace.Tracer, height int64, round int32,
self string, proposer string) {
client.Write(ProposerInfo{Height: height, Round: round, Self: self, Proposer: proposer})
}

// Schema constants for the consensus round state tracing database.
const (
// RoundStateTable is the name of the table that stores the consensus
// state traces.
RoundStateTable = "consensus_round_state"
RoundStateTable = "consensus_round_state"
NewProposalArrived = uint8(100)
NewHeightByStartTime = uint8(101)
StartTimeIsReached = uint8(102)
)

// RoundState describes schema for the "consensus_round_state" table.
Expand Down
3 changes: 2 additions & 1 deletion rpc/client/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
"github.com/tendermint/tendermint/types"
)

var waitForEventTimeout = 8 * time.Second
var blockTime = 12 * time.Second
var waitForEventTimeout = 8*time.Second + blockTime

// MakeTxKV returns a text transaction, allong with expected key, value pair
func MakeTxKV() ([]byte, []byte, []byte) {
Expand Down
1 change: 1 addition & 0 deletions rpc/core/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (

// SubscribeTimeout is the maximum time we wait to subscribe for an event.
// must be less than the server's write timeout (see rpcserver.DefaultConfig)
// TODO this may need adjustment
SubscribeTimeout = 5 * time.Second

// genesisChunkSize is the maximum size, in bytes, of each
Expand Down
5 changes: 4 additions & 1 deletion rpc/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand All @@ -25,8 +26,10 @@ func TestMain(m *testing.M) {
}

func TestBroadcastTx(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
res, err := rpctest.GetGRPCClient().BroadcastTx(
context.Background(),
ctx,
&core_grpc.RequestBroadcastTx{Tx: []byte("this is a tx")},
)
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions test/e2e/runner/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func Benchmark(testnet *e2e.Testnet, benchmarkLength int64) error {

// wait for the length of the benchmark period in blocks to pass. We allow 5 seconds for each block
// which should be sufficient.
// TODO the waitingTime may need some adjustment e.g.,
// waitingTime := time.Duration(benchmarkLength*5) * 12 * time.Second
waitingTime := time.Duration(benchmarkLength*5) * time.Second
endHeight, err := waitForAllNodes(testnet, block.Height+benchmarkLength, waitingTime)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion test/e2e/runner/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ func WaitUntil(testnet *e2e.Testnet, height int64) error {
// waitingTime estimates how long it should take for a node to reach the height.
// More nodes in a network implies we may expect a slower network and may have to wait longer.
func waitingTime(nodes int) time.Duration {
return time.Duration(20+(nodes*2)) * time.Second
blockTime := 12 * time.Second
// not sure what is the role of 20 in this formula?
return time.Duration(20+(nodes*2)) * blockTime
}
Loading
Loading