diff --git a/config/config.go b/config/config.go index d5e0312551..3180b46f8e 100644 --- a/config/config.go +++ b/config/config.go @@ -433,8 +433,9 @@ func DefaultRPCConfig() *RPCConfig { MaxSubscriptionClients: 100, MaxSubscriptionsPerClient: 5, SubscriptionBufferSize: defaultSubscriptionBufferSize, - TimeoutBroadcastTxCommit: 10 * time.Second, - WebSocketWriteBufferSize: defaultSubscriptionBufferSize, + // this value is changed to align with the new consistent block time of 12ish seconds + 3 seconds block time variance (15 seconds) + TimeoutBroadcastTxCommit: 20 * time.Second, + WebSocketWriteBufferSize: defaultSubscriptionBufferSize, MaxBodyBytes: int64(1000000), // 1MB MaxHeaderBytes: 1 << 20, // same as the net/http default diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 03b94e1856..02fc3ba1cb 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -305,7 +305,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { assert.Equal(t, prevoteHeight, ev.Height()) } } - case <-time.After(20 * time.Second): + case <-time.After(1 * time.Minute): for i, reactor := range reactors { t.Logf("Consensus Reactor %d\n%v", i, reactor) } diff --git a/consensus/common_test.go b/consensus/common_test.go index 511b004ea6..9d92351caf 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -53,7 +53,7 @@ type cleanupFunc func() var ( config *cfg.Config // NOTE: must be reset for each _test.go file consensusReplayConfig *cfg.Config - ensureTimeout = time.Millisecond * 200 + ensureTimeout = time.Second * 12 // time.Millisecond * 200 ) func ensureDir(dir string, mode os.FileMode) { diff --git a/consensus/reactor.go b/consensus/reactor.go index eec5afa8d0..534d72421f 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -3,7 +3,10 @@ package consensus import ( "errors" "fmt" + blog "log" + "os" "reflect" + "strconv" "sync" "time" @@ -36,6 +39,11 @@ const ( votesToContributeToBecomeGoodPeer = 10000 ) +var ( + stateKey = []byte("stateKey") + PrecommitDelay = 11 * time.Second +) + //----------------------------------------------------------------------------- // Reactor defines a reactor for the consensus service. @@ -1712,6 +1720,16 @@ func init() { cmtjson.RegisterType(&HasVoteMessage{}, "tendermint/HasVote") cmtjson.RegisterType(&VoteSetMaj23Message{}, "tendermint/VoteSetMaj23") cmtjson.RegisterType(&VoteSetBitsMessage{}, "tendermint/VoteSetBits") + + delay := os.Getenv("PRECOMMIT_DELAY") + if delay != "" { + parsedDelay, err := strconv.Atoi(delay) + if err == nil { + PrecommitDelay = time.Duration(parsedDelay) * time.Second + } + } + blog.Println("Precommit delay is set to", + PrecommitDelay.Seconds(), " seconds") } //------------------------------------- diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index a82aa38859..b7eac82634 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -709,9 +709,9 @@ func timeoutWaitGroup(t *testing.T, n int, f func(int), css []*State) { close(done) }() - // we're running many nodes in-process, possibly in in a virtual machine, + // we're running many nodes in-process, possibly in a virtual machine, // and spewing debug messages - making a block could take a while, - timeout := time.Second * 120 + timeout := (12 * time.Second) * 120 select { case <-done: diff --git a/consensus/state.go b/consensus/state.go index 35fa46d44d..1d5383bf9c 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -106,17 +106,17 @@ type State struct { privValidatorPubKey crypto.PubKey // state changes may be triggered by: msgs from peers, - // msgs from ourself, or by timeouts + // msgs from ourselves, or by timeouts peerMsgQueue chan msgInfo internalMsgQueue chan msgInfo timeoutTicker TimeoutTicker - // information about about added votes and block parts are written on this channel + // information about added votes and block parts are written on this channel // so statistics can be computed by reactor statsMsgQueue chan msgInfo // we use eventBus to trigger msg broadcasts in the reactor, - // and to notify external subscribers, eg. through a websocket + // and to notify external subscribers, e.g., through a websocket eventBus *types.EventBus // a Write-Ahead Log ensures we can recover from any kind of crash @@ -842,7 +842,7 @@ func (cs *State) handleMsg(mi msgInfo) { // if the proposal is complete, we'll enterPrevote or tryFinalizeCommit added, err = cs.addProposalBlockPart(msg, peerID) - // We unlock here to yield to any routines that need to read the the RoundState. + // We unlock here to yield to any routines that need to read the RoundState. // Previously, this code held the lock from the point at which the final block // part was received until the block executed against the application. // This prevented the reactor from being able to retrieve the most updated @@ -915,6 +915,10 @@ func (cs *State) handleMsg(mi msgInfo) { func (cs *State) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) { cs.Logger.Debug("received tock", "timeout", ti.Duration, "height", ti.Height, "round", ti.Round, "step", ti.Step) + if ti.Height == rs.Height { + schema.WriteRoundState(cs.traceClient, ti.Height, ti.Round, + schema.StartTimeIsReached) + } // timeouts must be for current height, round, step if ti.Height != rs.Height || ti.Round < rs.Round || (ti.Round == rs.Round && ti.Step < rs.Step) { @@ -1010,6 +1014,11 @@ func (cs *State) enterNewRound(height int64, round int32) { return } + if cs.Round == round && cs.Step == cstypes.RoundStepNewHeight { + schema.WriteRoundState(cs.traceClient, cs.Height, cs.Round, + schema.NewHeightByStartTime) + } + if now := cmttime.Now(); cs.StartTime.After(now) { logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now) } @@ -1099,12 +1108,13 @@ func (cs *State) enterPropose(height int64, round int32) { return } - logger.Debug("entering propose step", "current", log.NewLazySprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step)) + logger.Info("entering propose step", "current", + log.NewLazySprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step)) defer func() { // Done enterPropose: cs.updateRoundStep(round, cstypes.RoundStepPropose) - cs.newStep() + cs.newStep() // announce the new step // If we have the whole proposal + POL, then goto Prevote now. // else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart), @@ -1133,6 +1143,9 @@ func (cs *State) enterPropose(height int64, round int32) { } address := cs.privValidatorPubKey.Address() + logger.Info("attempting to trace proposer", "addr", address.String()) + schema.WriteProposer(cs.traceClient, height, round, address.String(), + cs.Validators.GetProposer().Address.String()) // if not a validator, we're done if !cs.Validators.HasAddress(address) { @@ -1141,6 +1154,7 @@ func (cs *State) enterPropose(height int64, round int32) { } if cs.isProposer(address) { + // trace if the node is the proposer logger.Debug("propose step; our turn to propose", "proposer", address) cs.decideProposal(height, round) } else { @@ -1255,7 +1269,7 @@ func (cs *State) createProposalBlock() (block *types.Block, blockParts *types.Pa // Enter: `timeoutPropose` after entering Propose. // Enter: proposal block and POL is ready. // Prevote for LockedBlock if we're locked, or ProposalBlock if valid. -// Otherwise vote nil. +// Otherwise, vote nil. func (cs *State) enterPrevote(height int64, round int32) { logger := cs.Logger.With("height", height, "round", round) @@ -1366,6 +1380,15 @@ func (cs *State) enterPrevoteWait(height int64, round int32) { cs.scheduleTimeout(cs.config.Prevote(round), height, round, cstypes.RoundStepPrevoteWait) } +// isReadyToPrecommit calculates if the process has waited at least 11 seconds +// from their start time before they can vote +func (cs *State) isReadyToPrecommit(height int64, round int32) (bool, time.Duration) { + precommitVoteTime := cs.StartTime.Add(PrecommitDelay) + waitTime := time.Until(precommitVoteTime) + schema.WritePrecommitTime(cs.traceClient, height, round, waitTime.Seconds()) + return waitTime <= 0, waitTime +} + // Enter: `timeoutPrevote` after any +2/3 prevotes. // Enter: `timeoutPrecommit` after any +2/3 precommits. // Enter: +2/3 precomits for block or nil. @@ -1383,6 +1406,12 @@ func (cs *State) enterPrecommit(height int64, round int32) { return } + if ready, waitTime := cs.isReadyToPrecommit(height, round); !ready { + // this will reenter precommit after the waitTime + cs.scheduleTimeout(waitTime, height, round, cstypes.RoundStepPrevoteWait) + return + } + logger.Debug("entering precommit step", "current", log.NewLazySprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step)) defer func() { @@ -1898,6 +1927,7 @@ func (cs *State) defaultSetProposal(proposal *types.Proposal) error { proposal.Signature = p.Signature cs.Proposal = proposal + schema.WriteRoundState(cs.traceClient, cs.Height, cs.Round, schema.NewProposalArrived) // We don't update cs.ProposalBlockParts if it is already set. // This happens if we're already in cstypes.RoundStepCommit or if there is a valid block in the current round. // TODO: We can check if Proposal is for a different block as this is a sign of misbehavior! @@ -2004,6 +2034,7 @@ func (cs *State) handleCompleteProposal(blockHeight int64) { // procedure at this point. } + // I think this happens before the cs.StartTime starts. if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() { // Move onto the next step cs.enterPrevote(blockHeight, cs.Round) @@ -2319,6 +2350,24 @@ func (cs *State) signAddVote(msgType cmtproto.SignedMsgType, hash []byte, header return nil } + //if msgType == cmtproto.PrecommitType { + // targetBlockTime := 11 * time.Second + // precommitVoteTime := cs.StartTime.Add(targetBlockTime) + // waitTime := precommitVoteTime.Sub(cmttime.Now()) + // schema.WritePrecommitTime(cs.traceClient, cs.Height, cs.Round, waitTime.Seconds()) + // if waitTime > 0 { + // //if waitTime > 11*time.Second { + // // cs.Logger.Debug("waiting for precommit vote was higher than"+ + // // " expected", "height", cs.Height, "round", cs.Round, + // // "waitTime", waitTime) + // // time.Sleep(11 * time.Second) + // //} else { + // // time.Sleep(waitTime) + // //} + // time.Sleep(waitTime) + // + // } + //} // TODO: pass pubKey to signVote vote, err := cs.signVote(msgType, hash, header) if err == nil { diff --git a/light/provider/http/http_test.go b/light/provider/http/http_test.go index 4067e6e185..9f732ec2f3 100644 --- a/light/provider/http/http_test.go +++ b/light/provider/http/http_test.go @@ -77,7 +77,13 @@ func TestProvider(t *testing.T) { require.Nil(t, lb) assert.Equal(t, provider.ErrHeightTooHigh, err) - _, err = p.LightBlock(context.Background(), 1) + // when we reach here, height 1 is expected to be pruned + // however, due to the delayed precommit logic which makes the node to + // sleep to match the 12s block time, + // it seems the pruning does not happen immediately, + // so adding a sleep here to see if that fixes the issue + time.Sleep(12 * 10 * time.Second) + lb, err = p.LightBlock(context.Background(), 1) require.Error(t, err) require.Nil(t, lb) assert.Equal(t, provider.ErrLightBlockNotFound, err) diff --git a/node/node_test.go b/node/node_test.go index db486b80c1..0fed731a46 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -50,11 +50,12 @@ func TestNodeStartStop(t *testing.T) { // wait for the node to produce a block blocksSub, err := n.EventBus().Subscribe(context.Background(), "node_test", types.EventQueryNewBlock) require.NoError(t, err) + blockTime := 12 * time.Second select { case <-blocksSub.Out(): case <-blocksSub.Cancelled(): t.Fatal("blocksSub was cancelled") - case <-time.After(10 * time.Second): + case <-time.After(blockTime): t.Fatal("timed out waiting for the node to produce a block") } diff --git a/pkg/trace/schema/consensus.go b/pkg/trace/schema/consensus.go index 6584429c2d..5e7fa19225 100644 --- a/pkg/trace/schema/consensus.go +++ b/pkg/trace/schema/consensus.go @@ -15,14 +15,59 @@ func ConsensusTables() []string { VoteTable, ConsensusStateTable, ProposalTable, + PrecommitTimeTable, + Proposer, } } +const ( + // PrecommitTimeTable + PrecommitTimeTable = "consensus_precommit_time" +) + +type PrecommitTime struct { + Height int64 `json:"height"` + Round int32 `json:"round"` + // amount of delay added before casting precommit for this Height and Round + Delay float64 `json:"delay"` +} + +func (p PrecommitTime) Table() string { + return PrecommitTimeTable +} + +func WritePrecommitTime(client trace.Tracer, height int64, round int32, delay float64) { + client.Write(PrecommitTime{Height: height, Round: round, Delay: delay}) + +} + +const Proposer = "consensus_proposer" + +type ProposerInfo struct { + Height int64 `json:"height"` + Round int32 `json:"round"` + Self string `json:"self"` + Proposer string `json:"address"` +} + +func (p ProposerInfo) Table() string { + return Proposer + +} + +func WriteProposer(client trace.Tracer, height int64, round int32, + self string, proposer string) { + client.Write(ProposerInfo{Height: height, Round: round, Self: self, Proposer: proposer}) +} + // Schema constants for the consensus round state tracing database. const ( // RoundStateTable is the name of the table that stores the consensus // state traces. - RoundStateTable = "consensus_round_state" + RoundStateTable = "consensus_round_state" + NewProposalArrived = uint8(100) + NewHeightByStartTime = uint8(101) + StartTimeIsReached = uint8(102) ) // RoundState describes schema for the "consensus_round_state" table. diff --git a/rpc/client/event_test.go b/rpc/client/event_test.go index d1d0b7a7de..378e989091 100644 --- a/rpc/client/event_test.go +++ b/rpc/client/event_test.go @@ -17,7 +17,8 @@ import ( "github.com/tendermint/tendermint/types" ) -var waitForEventTimeout = 8 * time.Second +var blockTime = 12 * time.Second +var waitForEventTimeout = 8*time.Second + blockTime // MakeTxKV returns a text transaction, allong with expected key, value pair func MakeTxKV() ([]byte, []byte, []byte) { diff --git a/rpc/core/env.go b/rpc/core/env.go index d71ccfbbed..99eb3b235a 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -27,6 +27,7 @@ const ( // SubscribeTimeout is the maximum time we wait to subscribe for an event. // must be less than the server's write timeout (see rpcserver.DefaultConfig) + // TODO this may need adjustment SubscribeTimeout = 5 * time.Second // genesisChunkSize is the maximum size, in bytes, of each diff --git a/rpc/grpc/grpc_test.go b/rpc/grpc/grpc_test.go index 073ff51c99..0d1c862997 100644 --- a/rpc/grpc/grpc_test.go +++ b/rpc/grpc/grpc_test.go @@ -4,6 +4,7 @@ import ( "context" "os" "testing" + "time" "github.com/stretchr/testify/require" @@ -25,8 +26,10 @@ func TestMain(m *testing.M) { } func TestBroadcastTx(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() res, err := rpctest.GetGRPCClient().BroadcastTx( - context.Background(), + ctx, &core_grpc.RequestBroadcastTx{Tx: []byte("this is a tx")}, ) require.NoError(t, err) diff --git a/test/e2e/runner/benchmark.go b/test/e2e/runner/benchmark.go index 545fb773d8..eec85d40aa 100644 --- a/test/e2e/runner/benchmark.go +++ b/test/e2e/runner/benchmark.go @@ -29,6 +29,8 @@ func Benchmark(testnet *e2e.Testnet, benchmarkLength int64) error { // wait for the length of the benchmark period in blocks to pass. We allow 5 seconds for each block // which should be sufficient. + // TODO the waitingTime may need some adjustment e.g., + // waitingTime := time.Duration(benchmarkLength*5) * 12 * time.Second waitingTime := time.Duration(benchmarkLength*5) * time.Second endHeight, err := waitForAllNodes(testnet, block.Height+benchmarkLength, waitingTime) if err != nil { diff --git a/test/e2e/runner/wait.go b/test/e2e/runner/wait.go index d89a8baacc..8b9ad51eb8 100644 --- a/test/e2e/runner/wait.go +++ b/test/e2e/runner/wait.go @@ -30,5 +30,7 @@ func WaitUntil(testnet *e2e.Testnet, height int64) error { // waitingTime estimates how long it should take for a node to reach the height. // More nodes in a network implies we may expect a slower network and may have to wait longer. func waitingTime(nodes int) time.Duration { - return time.Duration(20+(nodes*2)) * time.Second + blockTime := 12 * time.Second + // not sure what is the role of 20 in this formula? + return time.Duration(20+(nodes*2)) * blockTime } diff --git a/test/e2e/tests/app_test.go b/test/e2e/tests/app_test.go index 8b8d23960f..ba3de4a36e 100644 --- a/test/e2e/tests/app_test.go +++ b/test/e2e/tests/app_test.go @@ -75,7 +75,8 @@ func TestApp_Tx(t *testing.T) { require.NoError(t, err) hash := tx.Hash() - waitTime := 30 * time.Second + //blockTime := 30 * time.Second //12*time.Second + 3*time.Second // 3 seconds for block time variance + waitTime := 30 * time.Second // setting it to 15 s works locally, but not on CI require.Eventuallyf(t, func() bool { txResp, err := client.Tx(ctx, hash, false) return err == nil && bytes.Equal(txResp.Tx, tx) diff --git a/types/proposal.go b/types/proposal.go index 3401f6f055..6fba15724d 100644 --- a/types/proposal.go +++ b/types/proposal.go @@ -28,7 +28,7 @@ type Proposal struct { Round int32 `json:"round"` // there can not be greater than 2_147_483_647 rounds POLRound int32 `json:"pol_round"` // -1 if null. BlockID BlockID `json:"block_id"` - Timestamp time.Time `json:"timestamp"` + Timestamp time.Time `json:"timestamp"` // the time at which the proposal is made Signature []byte `json:"signature"` }