diff --git a/cmd/consensus/main.go b/cmd/consensus/main.go index 19f78052e48..f0dda64e98d 100644 --- a/cmd/consensus/main.go +++ b/cmd/consensus/main.go @@ -172,6 +172,7 @@ func main() { nodeBuilder. PreInit(cmd.DynamicStartPreInit). + ValidateRootSnapshot(badgerState.SanityCheckConsensusNodeRootSnapshotValidity). Module("consensus node metrics", func(node *cmd.NodeConfig) error { conMetrics = metrics.NewConsensusCollector(node.Tracer, node.MetricsRegisterer) return nil @@ -546,12 +547,13 @@ func main() { ) signer = verification.NewMetricsWrapper(signer, mainMetrics) // wrapper for measuring time spent with crypto-related operations + // create consensus logger + logger := createLogger(node.Logger, node.RootChainID) + // initialize a logging notifier for hotstuff notifier := createNotifier( - node.Logger, + logger, mainMetrics, - node.Tracer, - node.RootChainID, ) notifier.AddConsumer(finalizationDistributor) @@ -581,7 +583,7 @@ func main() { voteProcessorFactory := votecollector.NewCombinedVoteProcessorFactory(wrappedCommittee, qcDistributor.OnQcConstructedFromVotes) lowestViewForVoteProcessing := finalizedBlock.View + 1 voteAggregator, err := consensus.NewVoteAggregator( - node.Logger, + logger, mainMetrics, node.Metrics.Engine, node.Metrics.Mempool, @@ -594,9 +596,15 @@ func main() { } timeoutCollectorDistributor := pubsub.NewTimeoutCollectorDistributor() - timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory(timeoutCollectorDistributor, committee, validator, msig.ConsensusTimeoutTag) + timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory( + logger, + timeoutCollectorDistributor, + committee, + validator, + msig.ConsensusTimeoutTag, + ) timeoutAggregator, err := consensus.NewTimeoutAggregator( - node.Logger, + logger, mainMetrics, node.Metrics.Engine, node.Metrics.Mempool, @@ -669,7 +677,7 @@ func main() { // initialize hotstuff consensus algorithm hot, err = consensus.NewParticipant( - node.Logger, + createLogger(node.Logger, node.RootChainID), mainMetrics, build, finalizedBlock, @@ -689,7 +697,8 @@ func main() { // initialize the pending blocks cache proposals := buffer.NewPendingBlocks() - complianceCore, err := compliance.NewCore(node.Logger, + logger := createLogger(node.Logger, node.RootChainID) + complianceCore, err := compliance.NewCore(logger, node.Metrics.Engine, node.Metrics.Mempool, mainMetrics, @@ -713,7 +722,7 @@ func main() { // initialize the compliance engine comp, err = compliance.NewEngine( - node.Logger, + logger, node.Me, complianceCore, ) @@ -727,7 +736,7 @@ func main() { }). Component("consensus message hub", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { messageHub, err := message_hub.NewMessageHub( - node.Logger, + createLogger(node.Logger, node.RootChainID), node.Metrics.Engine, node.Network, node.Me, diff --git a/cmd/consensus/notifier.go b/cmd/consensus/notifier.go index 54523c64178..94fc57782e6 100644 --- a/cmd/consensus/notifier.go +++ b/cmd/consensus/notifier.go @@ -10,9 +10,14 @@ import ( metricsconsumer "github.com/onflow/flow-go/module/metrics/hotstuff" ) -func createNotifier(log zerolog.Logger, metrics module.HotstuffMetrics, tracer module.Tracer, chain flow.ChainID, -) *pubsub.Distributor { - telemetryConsumer := notifications.NewTelemetryConsumer(log, chain) +// createLogger creates logger which reports chain ID on every log message. +func createLogger(log zerolog.Logger, chainID flow.ChainID) zerolog.Logger { + return log.With().Str("chain", chainID.String()).Logger() +} + +// createNotifier creates a pubsub distributor and connects it to consensus consumers. +func createNotifier(log zerolog.Logger, metrics module.HotstuffMetrics) *pubsub.Distributor { + telemetryConsumer := notifications.NewTelemetryConsumer(log) metricsConsumer := metricsconsumer.NewMetricsConsumer(metrics) logsConsumer := notifications.NewLogConsumer(log) dis := pubsub.NewDistributor() diff --git a/cmd/node_builder.go b/cmd/node_builder.go index d85814caa38..f22134441a8 100644 --- a/cmd/node_builder.go +++ b/cmd/node_builder.go @@ -130,6 +130,10 @@ type NodeBuilder interface { // ValidateFlags sets any custom validation rules for the command line flags, // for example where certain combinations aren't allowed ValidateFlags(func() error) NodeBuilder + + // ValidateRootSnapshot sets any custom validation rules for the root snapshot. + // This check is executed after other checks but before applying any data from root snapshot. + ValidateRootSnapshot(f func(protocol.Snapshot) error) NodeBuilder } // BaseConfig is the general config for the NodeBuilder and the command line params diff --git a/cmd/scaffold.go b/cmd/scaffold.go index 76cbb12ddc8..12f1f8f766e 100644 --- a/cmd/scaffold.go +++ b/cmd/scaffold.go @@ -123,12 +123,15 @@ type FlowNodeBuilder struct { postShutdownFns []func() error preInitFns []BuilderFunc postInitFns []BuilderFunc + extraRootSnapshotCheck func(protocol.Snapshot) error extraFlagCheck func() error adminCommandBootstrapper *admin.CommandRunnerBootstrapper adminCommands map[string]func(config *NodeConfig) commands.AdminCommand componentBuilder component.ComponentManagerBuilder } +var _ NodeBuilder = (*FlowNodeBuilder)(nil) + func (fnb *FlowNodeBuilder) BaseFlags() { defaultConfig := DefaultBaseConfig() @@ -550,6 +553,11 @@ func (fnb *FlowNodeBuilder) ParseAndPrintFlags() error { return fnb.extraFlagsValidation() } +func (fnb *FlowNodeBuilder) ValidateRootSnapshot(f func(protocol.Snapshot) error) NodeBuilder { + fnb.extraRootSnapshotCheck = f + return fnb +} + func (fnb *FlowNodeBuilder) ValidateFlags(f func() error) NodeBuilder { fnb.extraFlagCheck = f return fnb @@ -1096,6 +1104,14 @@ func (fnb *FlowNodeBuilder) setRootSnapshot(rootSnapshot protocol.Snapshot) erro return fmt.Errorf("failed to validate root snapshot QCs: %w", err) } + // perform extra checks requested by specific node types + if fnb.extraRootSnapshotCheck != nil { + err = fnb.extraRootSnapshotCheck(rootSnapshot) + if err != nil { + return fmt.Errorf("failed to perform extra checks on root snapshot: %w", err) + } + } + fnb.RootSnapshot = rootSnapshot // cache properties of the root snapshot, for convenience fnb.RootResult, fnb.RootSeal, err = fnb.RootSnapshot.SealedResult() diff --git a/consensus/aggregators.go b/consensus/aggregators.go index be4df3b3842..10bf86083c8 100644 --- a/consensus/aggregators.go +++ b/consensus/aggregators.go @@ -61,7 +61,7 @@ func NewTimeoutAggregator(log zerolog.Logger, lowestRetainedView uint64, ) (hotstuff.TimeoutAggregator, error) { - timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(notifier, distributor, timeoutProcessorFactory) + timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(log, notifier, distributor, timeoutProcessorFactory) collectors := timeoutaggregator.NewTimeoutCollectors(log, lowestRetainedView, timeoutCollectorFactory) // initialize the timeout aggregator diff --git a/consensus/hotstuff/consumer.go b/consensus/hotstuff/consumer.go index 588520b90d1..5eb592b9912 100644 --- a/consensus/hotstuff/consumer.go +++ b/consensus/hotstuff/consumer.go @@ -115,7 +115,7 @@ type Consumer interface { // Prerequisites: // Implementation must be concurrency safe; Non-blocking; // and must handle repetition of the same events (with some processing overhead). - OnQcTriggeredViewChange(qc *flow.QuorumCertificate, newView uint64) + OnQcTriggeredViewChange(oldView uint64, newView uint64, qc *flow.QuorumCertificate) // OnTcTriggeredViewChange notifications are produced by PaceMaker when it moves to a new view // based on processing a TC. The arguments specify the tc (first argument), which triggered @@ -123,7 +123,7 @@ type Consumer interface { // Prerequisites: // Implementation must be concurrency safe; Non-blocking; // and must handle repetition of the same events (with some processing overhead). - OnTcTriggeredViewChange(tc *flow.TimeoutCertificate, newView uint64) + OnTcTriggeredViewChange(oldView uint64, newView uint64, tc *flow.TimeoutCertificate) // OnStartingTimeout notifications are produced by PaceMaker. Such a notification indicates that the // PaceMaker is now waiting for the system to (receive and) process blocks or votes. @@ -133,6 +133,36 @@ type Consumer interface { // and must handle repetition of the same events (with some processing overhead). OnStartingTimeout(model.TimerInfo) + // OnVoteProcessed notifications are produced by the Vote Aggregation logic, each time + // we successfully ingest a valid vote. + // Prerequisites: + // Implementation must be concurrency safe; Non-blocking; + // and must handle repetition of the same events (with some processing overhead). + OnVoteProcessed(vote *model.Vote) + + // OnTimeoutProcessed notifications are produced by the Timeout Aggregation logic, + // each time we successfully ingest a valid timeout. + // Prerequisites: + // Implementation must be concurrency safe; Non-blocking; + // and must handle repetition of the same events (with some processing overhead). + OnTimeoutProcessed(timeout *model.TimeoutObject) + + // OnCurrentViewDetails notifications are produced by the EventHandler during the course of a view with auxiliary information. + // These notifications are generally not produced for all views (for example skipped views). + // These notifications are guaranteed to be produced for all views we enter after fully processing a message. + // Example 1: + // - We are in view 8. We process a QC with view 10, causing us to enter view 11. + // - Then this notification will be produced for view 11. + // Example 2: + // - We are in view 8. We process a proposal with view 10, which contains a TC for view 9 and TC.NewestQC for view 8. + // - The QC would allow us to enter view 9 and the TC would allow us to enter view 10, + // so after fully processing the message we are in view 10. + // - Then this notification will be produced for view 10, but not view 9 + // Prerequisites: + // Implementation must be concurrency safe; Non-blocking; + // and must handle repetition of the same events (with some processing overhead). + OnCurrentViewDetails(currentView, finalizedView uint64, currentLeader flow.Identifier) + // OnDoubleVotingDetected notifications are produced by the Vote Aggregation logic // whenever a double voting (same voter voting for different blocks at the same view) was detected. // Prerequisites: @@ -145,7 +175,7 @@ type Consumer interface { // Prerequisites: // Implementation must be concurrency safe; Non-blocking; // and must handle repetition of the same events (with some processing overhead). - OnInvalidVoteDetected(*model.Vote) + OnInvalidVoteDetected(err model.InvalidVoteError) // OnVoteForInvalidBlockDetected notifications are produced by the Vote Aggregation logic // whenever vote for invalid proposal was detected. @@ -166,7 +196,7 @@ type Consumer interface { // Prerequisites: // Implementation must be concurrency safe; Non-blocking; // and must handle repetition of the same events (with some processing overhead). - OnInvalidTimeoutDetected(*model.TimeoutObject) + OnInvalidTimeoutDetected(err model.InvalidTimeoutError) } // QCCreatedConsumer consumes outbound notifications produced by HotStuff and its components. diff --git a/consensus/hotstuff/eventhandler/event_handler.go b/consensus/hotstuff/eventhandler/event_handler.go index 3371673eade..f9304401a6c 100644 --- a/consensus/hotstuff/eventhandler/event_handler.go +++ b/consensus/hotstuff/eventhandler/event_handler.go @@ -369,11 +369,14 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error { if err != nil { return fmt.Errorf("failed to determine primary for new view %d: %w", curView, err) } + finalizedView := e.forks.FinalizedView() log := e.log.With(). Uint64("cur_view", curView). - Uint64("finalized_view", e.forks.FinalizedView()). + Uint64("finalized_view", finalizedView). Hex("leader_id", currentLeader[:]).Logger() + e.notifier.OnCurrentViewDetails(curView, finalizedView, currentLeader) + // check that I am the primary for this view and that I haven't already proposed; otherwise there is nothing to do if e.committee.Self() != currentLeader { return nil diff --git a/consensus/hotstuff/eventhandler/event_handler_test.go b/consensus/hotstuff/eventhandler/event_handler_test.go index 0d5815a7076..485b0cc91f2 100644 --- a/consensus/hotstuff/eventhandler/event_handler_test.go +++ b/consensus/hotstuff/eventhandler/event_handler_test.go @@ -87,8 +87,8 @@ func initPaceMaker(t require.TestingT, ctx context.Context, livenessData *hotstu persist.On("GetLivenessData").Return(livenessData, nil).Once() pm := NewTestPaceMaker(timeout.NewController(tc), notifier, persist) notifier.On("OnStartingTimeout", mock.Anything).Return() - notifier.On("OnQcTriggeredViewChange", mock.Anything, mock.Anything).Return() - notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything).Return() + notifier.On("OnQcTriggeredViewChange", mock.Anything, mock.Anything, mock.Anything).Return() + notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything, mock.Anything).Return() notifier.On("OnViewChange", mock.Anything, mock.Anything).Maybe() pm.Start(ctx) return pm @@ -301,6 +301,7 @@ func (es *EventHandlerSuite) SetupTest() { es.notifier.On("OnReceiveTc", mock.Anything, mock.Anything).Maybe() es.notifier.On("OnPartialTc", mock.Anything, mock.Anything).Maybe() es.notifier.On("OnLocalTimeout", mock.Anything).Maybe() + es.notifier.On("OnCurrentViewDetails", mock.Anything, mock.Anything, mock.Anything).Maybe() eventhandler, err := NewEventHandler( zerolog.New(os.Stderr), diff --git a/consensus/hotstuff/integration/instance_test.go b/consensus/hotstuff/integration/instance_test.go index 90699c17862..68aa714d1ba 100644 --- a/consensus/hotstuff/integration/instance_test.go +++ b/consensus/hotstuff/integration/instance_test.go @@ -484,11 +484,22 @@ func NewInstance(t *testing.T, options ...Option) *Instance { nil, ).Maybe() - p, err := timeoutcollector.NewTimeoutProcessor(in.committee, in.validator, aggregator, collectorDistributor) + p, err := timeoutcollector.NewTimeoutProcessor( + unittest.Logger(), + in.committee, + in.validator, + aggregator, + collectorDistributor, + ) require.NoError(t, err) return p }, nil).Maybe() - timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(notifier, collectorDistributor, timeoutProcessorFactory) + timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory( + unittest.Logger(), + notifier, + collectorDistributor, + timeoutProcessorFactory, + ) timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(log, livenessData.CurrentView, timeoutCollectorFactory) // initialize the timeout aggregator diff --git a/consensus/hotstuff/mocks/consumer.go b/consensus/hotstuff/mocks/consumer.go index 0f4addd0cc8..372242f3659 100644 --- a/consensus/hotstuff/mocks/consumer.go +++ b/consensus/hotstuff/mocks/consumer.go @@ -23,6 +23,11 @@ func (_m *Consumer) OnBlockIncorporated(_a0 *model.Block) { _m.Called(_a0) } +// OnCurrentViewDetails provides a mock function with given fields: currentView, finalizedView, currentLeader +func (_m *Consumer) OnCurrentViewDetails(currentView uint64, finalizedView uint64, currentLeader flow.Identifier) { + _m.Called(currentView, finalizedView, currentLeader) +} + // OnDoubleProposeDetected provides a mock function with given fields: _a0, _a1 func (_m *Consumer) OnDoubleProposeDetected(_a0 *model.Block, _a1 *model.Block) { _m.Called(_a0, _a1) @@ -48,14 +53,14 @@ func (_m *Consumer) OnFinalizedBlock(_a0 *model.Block) { _m.Called(_a0) } -// OnInvalidTimeoutDetected provides a mock function with given fields: _a0 -func (_m *Consumer) OnInvalidTimeoutDetected(_a0 *model.TimeoutObject) { - _m.Called(_a0) +// OnInvalidTimeoutDetected provides a mock function with given fields: err +func (_m *Consumer) OnInvalidTimeoutDetected(err model.InvalidTimeoutError) { + _m.Called(err) } -// OnInvalidVoteDetected provides a mock function with given fields: _a0 -func (_m *Consumer) OnInvalidVoteDetected(_a0 *model.Vote) { - _m.Called(_a0) +// OnInvalidVoteDetected provides a mock function with given fields: err +func (_m *Consumer) OnInvalidVoteDetected(err model.InvalidVoteError) { + _m.Called(err) } // OnLocalTimeout provides a mock function with given fields: currentView @@ -83,9 +88,9 @@ func (_m *Consumer) OnPartialTc(currentView uint64, partialTc *hotstuff.PartialT _m.Called(currentView, partialTc) } -// OnQcTriggeredViewChange provides a mock function with given fields: qc, newView -func (_m *Consumer) OnQcTriggeredViewChange(qc *flow.QuorumCertificate, newView uint64) { - _m.Called(qc, newView) +// OnQcTriggeredViewChange provides a mock function with given fields: oldView, newView, qc +func (_m *Consumer) OnQcTriggeredViewChange(oldView uint64, newView uint64, qc *flow.QuorumCertificate) { + _m.Called(oldView, newView, qc) } // OnReceiveProposal provides a mock function with given fields: currentView, proposal @@ -113,9 +118,14 @@ func (_m *Consumer) OnStartingTimeout(_a0 model.TimerInfo) { _m.Called(_a0) } -// OnTcTriggeredViewChange provides a mock function with given fields: tc, newView -func (_m *Consumer) OnTcTriggeredViewChange(tc *flow.TimeoutCertificate, newView uint64) { - _m.Called(tc, newView) +// OnTcTriggeredViewChange provides a mock function with given fields: oldView, newView, tc +func (_m *Consumer) OnTcTriggeredViewChange(oldView uint64, newView uint64, tc *flow.TimeoutCertificate) { + _m.Called(oldView, newView, tc) +} + +// OnTimeoutProcessed provides a mock function with given fields: timeout +func (_m *Consumer) OnTimeoutProcessed(timeout *model.TimeoutObject) { + _m.Called(timeout) } // OnViewChange provides a mock function with given fields: oldView, newView @@ -128,6 +138,11 @@ func (_m *Consumer) OnVoteForInvalidBlockDetected(vote *model.Vote, invalidPropo _m.Called(vote, invalidProposal) } +// OnVoteProcessed provides a mock function with given fields: vote +func (_m *Consumer) OnVoteProcessed(vote *model.Vote) { + _m.Called(vote) +} + type mockConstructorTestingTNewConsumer interface { mock.TestingT Cleanup(func()) diff --git a/consensus/hotstuff/model/errors.go b/consensus/hotstuff/model/errors.go index f3b54ed644e..cbc167384d5 100644 --- a/consensus/hotstuff/model/errors.go +++ b/consensus/hotstuff/model/errors.go @@ -186,21 +186,19 @@ func (e InvalidBlockError) Unwrap() error { // InvalidVoteError indicates that the vote with identifier `VoteID` is invalid type InvalidVoteError struct { - VoteID flow.Identifier - View uint64 - Err error + Vote *Vote + Err error } func NewInvalidVoteErrorf(vote *Vote, msg string, args ...interface{}) error { return InvalidVoteError{ - VoteID: vote.ID(), - View: vote.View, - Err: fmt.Errorf(msg, args...), + Vote: vote, + Err: fmt.Errorf(msg, args...), } } func (e InvalidVoteError) Error() string { - return fmt.Sprintf("invalid vote %x for view %d: %s", e.VoteID, e.View, e.Err.Error()) + return fmt.Sprintf("invalid vote at view %d for block %x: %s", e.Vote.View, e.Vote.BlockID, e.Err.Error()) } // IsInvalidVoteError returns whether an error is InvalidVoteError @@ -209,6 +207,17 @@ func IsInvalidVoteError(err error) bool { return errors.As(err, &e) } +// AsInvalidVoteError determines whether the given error is a InvalidVoteError +// (potentially wrapped). It follows the same semantics as a checked type cast. +func AsInvalidVoteError(err error) (*InvalidVoteError, bool) { + var e InvalidVoteError + ok := errors.As(err, &e) + if ok { + return &e, true + } + return nil, false +} + func (e InvalidVoteError) Unwrap() error { return e.Err } @@ -423,6 +432,17 @@ func IsInvalidTimeoutError(err error) bool { return errors.As(err, &e) } +// AsInvalidTimeoutError determines whether the given error is a InvalidTimeoutError +// (potentially wrapped). It follows the same semantics as a checked type cast. +func AsInvalidTimeoutError(err error) (*InvalidTimeoutError, bool) { + var e InvalidTimeoutError + ok := errors.As(err, &e) + if ok { + return &e, true + } + return nil, false +} + func (e InvalidTimeoutError) Unwrap() error { return e.Err } diff --git a/consensus/hotstuff/notifications/log_consumer.go b/consensus/hotstuff/notifications/log_consumer.go index 51f35f051c4..0f3329c356d 100644 --- a/consensus/hotstuff/notifications/log_consumer.go +++ b/consensus/hotstuff/notifications/log_consumer.go @@ -117,19 +117,21 @@ func (lc *LogConsumer) OnViewChange(oldView, newView uint64) { Msg("entered new view") } -func (lc *LogConsumer) OnQcTriggeredViewChange(qc *flow.QuorumCertificate, newView uint64) { +func (lc *LogConsumer) OnQcTriggeredViewChange(oldView uint64, newView uint64, qc *flow.QuorumCertificate) { lc.log.Debug(). Uint64("qc_view", qc.View). Hex("qc_block_id", qc.BlockID[:]). + Uint64("old_view", oldView). Uint64("new_view", newView). Msg("QC triggered view change") } -func (lc *LogConsumer) OnTcTriggeredViewChange(tc *flow.TimeoutCertificate, newView uint64) { +func (lc *LogConsumer) OnTcTriggeredViewChange(oldView uint64, newView uint64, tc *flow.TimeoutCertificate) { lc.log.Debug(). Uint64("tc_view", tc.View). Uint64("tc_newest_qc_view", tc.NewestQC.View). Uint64("new_view", newView). + Uint64("old_view", oldView). Msg("TC triggered view change") } @@ -140,6 +142,27 @@ func (lc *LogConsumer) OnStartingTimeout(info model.TimerInfo) { Msg("timeout started") } +func (lc *LogConsumer) OnVoteProcessed(vote *model.Vote) { + lc.log.Debug(). + Hex("block_id", vote.BlockID[:]). + Uint64("block_view", vote.View). + Hex("recipient_id", vote.SignerID[:]). + Msg("processed valid HotStuff vote") +} + +func (lc *LogConsumer) OnTimeoutProcessed(timeout *model.TimeoutObject) { + log := timeout.LogContext(lc.log).Logger() + log.Debug().Msg("processed valid timeout object") +} + +func (lc *LogConsumer) OnCurrentViewDetails(currentView, finalizedView uint64, currentLeader flow.Identifier) { + lc.log.Info(). + Uint64("view", currentView). + Uint64("finalized_view", finalizedView). + Hex("current_leader", currentLeader[:]). + Msg("current view details") +} + func (lc *LogConsumer) OnDoubleVotingDetected(vote *model.Vote, alt *model.Vote) { lc.log.Warn(). Uint64("vote_view", vote.View). @@ -149,12 +172,12 @@ func (lc *LogConsumer) OnDoubleVotingDetected(vote *model.Vote, alt *model.Vote) Msg("double vote detected") } -func (lc *LogConsumer) OnInvalidVoteDetected(vote *model.Vote) { +func (lc *LogConsumer) OnInvalidVoteDetected(err model.InvalidVoteError) { lc.log.Warn(). - Uint64("vote_view", vote.View). - Hex("voted_block_id", vote.BlockID[:]). - Hex("voter_id", vote.SignerID[:]). - Msg("invalid vote detected") + Uint64("vote_view", err.Vote.View). + Hex("voted_block_id", err.Vote.BlockID[:]). + Hex("voter_id", err.Vote.SignerID[:]). + Msgf("invalid vote detected: %s", err.Error()) } func (lc *LogConsumer) OnVoteForInvalidBlockDetected(vote *model.Vote, proposal *model.Proposal) { @@ -175,9 +198,9 @@ func (lc *LogConsumer) OnDoubleTimeoutDetected(timeout *model.TimeoutObject, alt Msg("double timeout detected") } -func (lc *LogConsumer) OnInvalidTimeoutDetected(timeout *model.TimeoutObject) { - log := timeout.LogContext(lc.log).Logger() - log.Warn().Msg("invalid timeout detected") +func (lc *LogConsumer) OnInvalidTimeoutDetected(err model.InvalidTimeoutError) { + log := err.Timeout.LogContext(lc.log).Logger() + log.Warn().Msgf("invalid timeout detected: %s", err.Error()) } func (lc *LogConsumer) logBasicBlockData(loggerEvent *zerolog.Event, block *model.Block) *zerolog.Event { diff --git a/consensus/hotstuff/notifications/noop_consumer.go b/consensus/hotstuff/notifications/noop_consumer.go index f23c02cd22f..b5d980acdd3 100644 --- a/consensus/hotstuff/notifications/noop_consumer.go +++ b/consensus/hotstuff/notifications/noop_consumer.go @@ -43,21 +43,27 @@ func (*NoopPartialConsumer) OnLocalTimeout(uint64) {} func (*NoopPartialConsumer) OnViewChange(uint64, uint64) {} -func (*NoopPartialConsumer) OnQcTriggeredViewChange(*flow.QuorumCertificate, uint64) {} +func (*NoopPartialConsumer) OnQcTriggeredViewChange(uint64, uint64, *flow.QuorumCertificate) {} -func (*NoopPartialConsumer) OnTcTriggeredViewChange(*flow.TimeoutCertificate, uint64) {} +func (*NoopPartialConsumer) OnTcTriggeredViewChange(uint64, uint64, *flow.TimeoutCertificate) {} func (*NoopPartialConsumer) OnStartingTimeout(model.TimerInfo) {} +func (*NoopPartialConsumer) OnVoteProcessed(*model.Vote) {} + +func (*NoopPartialConsumer) OnTimeoutProcessed(*model.TimeoutObject) {} + +func (*NoopPartialConsumer) OnCurrentViewDetails(uint64, uint64, flow.Identifier) {} + func (*NoopPartialConsumer) OnDoubleVotingDetected(*model.Vote, *model.Vote) {} -func (*NoopPartialConsumer) OnInvalidVoteDetected(*model.Vote) {} +func (*NoopPartialConsumer) OnInvalidVoteDetected(model.InvalidVoteError) {} func (*NoopPartialConsumer) OnVoteForInvalidBlockDetected(*model.Vote, *model.Proposal) {} func (*NoopPartialConsumer) OnDoubleTimeoutDetected(*model.TimeoutObject, *model.TimeoutObject) {} -func (*NoopPartialConsumer) OnInvalidTimeoutDetected(*model.TimeoutObject) {} +func (*NoopPartialConsumer) OnInvalidTimeoutDetected(model.InvalidTimeoutError) {} // no-op implementation of hotstuff.FinalizationConsumer diff --git a/consensus/hotstuff/notifications/pubsub/distributor.go b/consensus/hotstuff/notifications/pubsub/distributor.go index a754ff75b21..d122ad8cde3 100644 --- a/consensus/hotstuff/notifications/pubsub/distributor.go +++ b/consensus/hotstuff/notifications/pubsub/distributor.go @@ -31,7 +31,7 @@ func NewDistributor() *Distributor { return &Distributor{} } -// AddConsumer adds an a event consumer to the Distributor +// AddConsumer adds an event consumer to the Distributor func (p *Distributor) AddConsumer(consumer hotstuff.Consumer) { p.lock.Lock() defer p.lock.Unlock() @@ -94,19 +94,19 @@ func (p *Distributor) OnViewChange(oldView, newView uint64) { } } -func (p *Distributor) OnQcTriggeredViewChange(qc *flow.QuorumCertificate, newView uint64) { +func (p *Distributor) OnQcTriggeredViewChange(oldView uint64, newView uint64, qc *flow.QuorumCertificate) { p.lock.RLock() defer p.lock.RUnlock() for _, subscriber := range p.subscribers { - subscriber.OnQcTriggeredViewChange(qc, newView) + subscriber.OnQcTriggeredViewChange(oldView, newView, qc) } } -func (p *Distributor) OnTcTriggeredViewChange(tc *flow.TimeoutCertificate, newView uint64) { +func (p *Distributor) OnTcTriggeredViewChange(oldView uint64, newView uint64, tc *flow.TimeoutCertificate) { p.lock.RLock() defer p.lock.RUnlock() for _, subscriber := range p.subscribers { - subscriber.OnTcTriggeredViewChange(tc, newView) + subscriber.OnTcTriggeredViewChange(oldView, newView, tc) } } @@ -118,6 +118,30 @@ func (p *Distributor) OnStartingTimeout(timerInfo model.TimerInfo) { } } +func (p *Distributor) OnVoteProcessed(vote *model.Vote) { + p.lock.RLock() + defer p.lock.RUnlock() + for _, subscriber := range p.subscribers { + subscriber.OnVoteProcessed(vote) + } +} + +func (p *Distributor) OnTimeoutProcessed(timeout *model.TimeoutObject) { + p.lock.RLock() + defer p.lock.RUnlock() + for _, subscriber := range p.subscribers { + subscriber.OnTimeoutProcessed(timeout) + } +} + +func (p *Distributor) OnCurrentViewDetails(currentView, finalizedView uint64, currentLeader flow.Identifier) { + p.lock.RLock() + defer p.lock.RUnlock() + for _, subscriber := range p.subscribers { + subscriber.OnCurrentViewDetails(currentView, finalizedView, currentLeader) + } +} + func (p *Distributor) OnBlockIncorporated(block *model.Block) { p.lock.RLock() defer p.lock.RUnlock() @@ -150,11 +174,11 @@ func (p *Distributor) OnDoubleVotingDetected(vote1, vote2 *model.Vote) { } } -func (p *Distributor) OnInvalidVoteDetected(vote *model.Vote) { +func (p *Distributor) OnInvalidVoteDetected(err model.InvalidVoteError) { p.lock.RLock() defer p.lock.RUnlock() for _, subscriber := range p.subscribers { - subscriber.OnInvalidVoteDetected(vote) + subscriber.OnInvalidVoteDetected(err) } } @@ -174,11 +198,11 @@ func (p *Distributor) OnDoubleTimeoutDetected(timeout *model.TimeoutObject, altT } } -func (p *Distributor) OnInvalidTimeoutDetected(timeout *model.TimeoutObject) { +func (p *Distributor) OnInvalidTimeoutDetected(err model.InvalidTimeoutError) { p.lock.RLock() defer p.lock.RUnlock() for _, subscriber := range p.subscribers { - subscriber.OnInvalidTimeoutDetected(timeout) + subscriber.OnInvalidTimeoutDetected(err) } } diff --git a/consensus/hotstuff/notifications/telemetry.go b/consensus/hotstuff/notifications/telemetry.go index fd7e473680e..67f0ca1339a 100644 --- a/consensus/hotstuff/notifications/telemetry.go +++ b/consensus/hotstuff/notifications/telemetry.go @@ -25,20 +25,27 @@ import ( // -- a local timeout has been initiated // - Each path through the state machine is identified by a unique id. // +// Additionally, the TelemetryConsumer reports events related to vote and timeout aggregation +// but those events are not bound to a path, so they are reported differently. // Generally, the TelemetryConsumer could export the collected data to a variety of backends. // For now, we export the data to a logger. // // Telemetry does NOT capture slashing notifications type TelemetryConsumer struct { NoopConsumer - pathHandler *PathHandler + pathHandler *PathHandler + noPathLogger zerolog.Logger } var _ hotstuff.Consumer = (*TelemetryConsumer)(nil) -func NewTelemetryConsumer(log zerolog.Logger, chain flow.ChainID) *TelemetryConsumer { +// NewTelemetryConsumer creates consumer that reports telemetry events using logger backend. +// Logger MUST include `chain` parameter as part of log context with corresponding chain ID to correctly map telemetry events to chain. +func NewTelemetryConsumer(log zerolog.Logger) *TelemetryConsumer { + pathHandler := NewPathHandler(log) return &TelemetryConsumer{ - pathHandler: NewPathHandler(log, chain), + pathHandler: pathHandler, + noPathLogger: pathHandler.log, } } @@ -137,17 +144,19 @@ func (t *TelemetryConsumer) OnFinalizedBlock(block *model.Block) { Msg("OnFinalizedBlock") } -func (t *TelemetryConsumer) OnQcTriggeredViewChange(qc *flow.QuorumCertificate, newView uint64) { +func (t *TelemetryConsumer) OnQcTriggeredViewChange(oldView uint64, newView uint64, qc *flow.QuorumCertificate) { t.pathHandler.NextStep(). Uint64("qc_view", qc.View). + Uint64("old_view", oldView). Uint64("next_view", newView). Hex("qc_block_id", qc.BlockID[:]). Msg("OnQcTriggeredViewChange") } -func (t *TelemetryConsumer) OnTcTriggeredViewChange(tc *flow.TimeoutCertificate, newView uint64) { +func (t *TelemetryConsumer) OnTcTriggeredViewChange(oldView uint64, newView uint64, tc *flow.TimeoutCertificate) { t.pathHandler.NextStep(). Uint64("tc_view", tc.View). + Uint64("old_view", oldView). Uint64("next_view", newView). Uint64("tc_newest_qc_view", tc.NewestQC.View). Hex("tc_newest_qc_block_id", tc.NewestQC.BlockID[:]). @@ -198,6 +207,39 @@ func (t *TelemetryConsumer) OnOwnTimeout(timeout *model.TimeoutObject) { step.Msg("OnOwnTimeout") } +func (t *TelemetryConsumer) OnVoteProcessed(vote *model.Vote) { + t.noPathLogger.Info(). + Uint64("voted_block_view", vote.View). + Hex("voted_block_id", logging.ID(vote.BlockID)). + Hex("signer_id", logging.ID(vote.SignerID)). + Msg("OnVoteProcessed") +} + +func (t *TelemetryConsumer) OnTimeoutProcessed(timeout *model.TimeoutObject) { + step := t.noPathLogger.Info(). + Uint64("view", timeout.View). + Uint64("timeout_tick", timeout.TimeoutTick). + Uint64("newest_qc_view", timeout.NewestQC.View). + Hex("newest_qc_block_id", logging.ID(timeout.NewestQC.BlockID)). + Hex("signer_id", logging.ID(timeout.SignerID)) + lastViewTC := timeout.LastViewTC + if lastViewTC != nil { + step. + Uint64("last_view_tc_view", lastViewTC.View). + Uint64("last_view_tc_newest_qc_view", lastViewTC.NewestQC.View). + Hex("last_view_tc_newest_qc_block_id", logging.ID(lastViewTC.NewestQC.BlockID)) + } + step.Msg("OnTimeoutProcessed") +} + +func (t *TelemetryConsumer) OnCurrentViewDetails(currentView, finalizedView uint64, currentLeader flow.Identifier) { + t.pathHandler.NextStep(). + Uint64("view", currentView). + Uint64("finalized_view", finalizedView). + Hex("current_leader", currentLeader[:]). + Msg("OnCurrentViewDetails") +} + // PathHandler maintains a notion of the current path through the state machine. // It allows to close a path and open new path. Each path is identified by a unique // (randomly generated) uuid. Along each path, we can capture information about relevant @@ -205,8 +247,7 @@ func (t *TelemetryConsumer) OnOwnTimeout(timeout *model.TimeoutObject) { // In case there is no currently open path, the PathHandler still returns a Step, // but such steps are logged as telemetry errors. type PathHandler struct { - chain flow.ChainID - log zerolog.Logger + log zerolog.Logger // currentPath holds a Zerolog Context with the information about the current path. // We represent the case where the current path has been closed by nil value. @@ -215,10 +256,10 @@ type PathHandler struct { // NewPathHandler instantiate a new PathHandler. // The PathHandler has no currently open path -func NewPathHandler(log zerolog.Logger, chain flow.ChainID) *PathHandler { +// Logger MUST include `chain` parameter as part of log context with corresponding chain ID to correctly map telemetry events to chain. +func NewPathHandler(log zerolog.Logger) *PathHandler { return &PathHandler{ - chain: chain, - log: log.With().Str("hotstuff", "telemetry").Str("chain", chain.String()).Logger(), + log: log.With().Str("component", "hotstuff.telemetry").Logger(), currentPath: nil, } } diff --git a/consensus/hotstuff/pacemaker/pacemaker.go b/consensus/hotstuff/pacemaker/pacemaker.go index 325c8ab2b88..8cb5ca3848e 100644 --- a/consensus/hotstuff/pacemaker/pacemaker.go +++ b/consensus/hotstuff/pacemaker/pacemaker.go @@ -144,7 +144,7 @@ func (p *ActivePaceMaker) ProcessQC(qc *flow.QuorumCertificate) (*model.NewViewE return nil, err } - p.notifier.OnQcTriggeredViewChange(qc, newView) + p.notifier.OnQcTriggeredViewChange(oldView, newView, qc) p.notifier.OnViewChange(oldView, newView) timerInfo := p.timeoutControl.StartTimeout(p.ctx, newView) @@ -185,7 +185,7 @@ func (p *ActivePaceMaker) ProcessTC(tc *flow.TimeoutCertificate) (*model.NewView return nil, err } - p.notifier.OnTcTriggeredViewChange(tc, newView) + p.notifier.OnTcTriggeredViewChange(oldView, newView, tc) p.notifier.OnViewChange(oldView, newView) timerInfo := p.timeoutControl.StartTimeout(p.ctx, newView) diff --git a/consensus/hotstuff/pacemaker/pacemaker_test.go b/consensus/hotstuff/pacemaker/pacemaker_test.go index 84ea999e877..b0a8f70861d 100644 --- a/consensus/hotstuff/pacemaker/pacemaker_test.go +++ b/consensus/hotstuff/pacemaker/pacemaker_test.go @@ -99,7 +99,7 @@ func (s *ActivePaceMakerTestSuite) TestProcessQC_SkipIncreaseViewThroughQC() { qc := QC(s.livenessData.CurrentView) s.persist.On("PutLivenessData", LivenessData(qc)).Return(nil).Once() s.notifier.On("OnStartingTimeout", expectedTimerInfo(4)).Return().Once() - s.notifier.On("OnQcTriggeredViewChange", qc, uint64(4)).Return().Once() + s.notifier.On("OnQcTriggeredViewChange", s.livenessData.CurrentView, uint64(4), qc).Return().Once() s.notifier.On("OnViewChange", s.livenessData.CurrentView, qc.View+1).Once() nve, err := s.paceMaker.ProcessQC(qc) require.NoError(s.T(), err) @@ -112,7 +112,7 @@ func (s *ActivePaceMakerTestSuite) TestProcessQC_SkipIncreaseViewThroughQC() { qc = QC(s.livenessData.CurrentView + 10) s.persist.On("PutLivenessData", LivenessData(qc)).Return(nil).Once() s.notifier.On("OnStartingTimeout", expectedTimerInfo(qc.View+1)).Return().Once() - s.notifier.On("OnQcTriggeredViewChange", qc, qc.View+1).Return().Once() + s.notifier.On("OnQcTriggeredViewChange", s.livenessData.CurrentView, qc.View+1, qc).Return().Once() s.notifier.On("OnViewChange", s.livenessData.CurrentView, qc.View+1).Once() nve, err = s.paceMaker.ProcessQC(qc) require.NoError(s.T(), err) @@ -135,7 +135,7 @@ func (s *ActivePaceMakerTestSuite) TestProcessTC_SkipIncreaseViewThroughTC() { } s.persist.On("PutLivenessData", expectedLivenessData).Return(nil).Once() s.notifier.On("OnStartingTimeout", expectedTimerInfo(tc.View+1)).Return().Once() - s.notifier.On("OnTcTriggeredViewChange", tc, tc.View+1).Return().Once() + s.notifier.On("OnTcTriggeredViewChange", s.livenessData.CurrentView, tc.View+1, tc).Return().Once() s.notifier.On("OnViewChange", s.livenessData.CurrentView, tc.View+1).Once() nve, err := s.paceMaker.ProcessTC(tc) require.NoError(s.T(), err) @@ -154,7 +154,7 @@ func (s *ActivePaceMakerTestSuite) TestProcessTC_SkipIncreaseViewThroughTC() { } s.persist.On("PutLivenessData", expectedLivenessData).Return(nil).Once() s.notifier.On("OnStartingTimeout", expectedTimerInfo(tc.View+1)).Return().Once() - s.notifier.On("OnTcTriggeredViewChange", tc, tc.View+1).Return().Once() + s.notifier.On("OnTcTriggeredViewChange", s.livenessData.CurrentView, tc.View+1, tc).Return().Once() s.notifier.On("OnViewChange", s.livenessData.CurrentView, tc.View+1).Once() nve, err = s.paceMaker.ProcessTC(tc) require.NoError(s.T(), err) @@ -211,8 +211,8 @@ func (s *ActivePaceMakerTestSuite) TestProcessQC_InvalidatesLastViewTC() { helper.WithTCNewestQC(s.livenessData.NewestQC)) s.persist.On("PutLivenessData", mock.Anything).Return(nil).Times(2) s.notifier.On("OnStartingTimeout", mock.Anything).Return().Times(2) - s.notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything).Return().Once() - s.notifier.On("OnQcTriggeredViewChange", mock.Anything, mock.Anything).Return().Once() + s.notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything, mock.Anything).Return().Once() + s.notifier.On("OnQcTriggeredViewChange", mock.Anything, mock.Anything, mock.Anything).Return().Once() s.notifier.On("OnViewChange", s.livenessData.CurrentView, tc.View+1).Once() nve, err := s.paceMaker.ProcessTC(tc) require.NotNil(s.T(), nve) @@ -242,7 +242,7 @@ func (s *ActivePaceMakerTestSuite) TestProcessQC_IgnoreOldQC() { func (s *ActivePaceMakerTestSuite) TestProcessQC_UpdateNewestQC() { s.persist.On("PutLivenessData", mock.Anything).Return(nil).Once() s.notifier.On("OnStartingTimeout", mock.Anything).Return().Once() - s.notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything).Return().Once() + s.notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything, mock.Anything).Return().Once() tc := helper.MakeTC(helper.WithTCView(s.livenessData.CurrentView+10), helper.WithTCNewestQC(s.livenessData.NewestQC)) s.notifier.On("OnViewChange", s.livenessData.CurrentView, tc.View+1).Once() @@ -269,7 +269,7 @@ func (s *ActivePaceMakerTestSuite) TestProcessQC_UpdateNewestQC() { func (s *ActivePaceMakerTestSuite) TestProcessTC_UpdateNewestQC() { s.persist.On("PutLivenessData", mock.Anything).Return(nil).Once() s.notifier.On("OnStartingTimeout", mock.Anything).Return().Once() - s.notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything).Return().Once() + s.notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything, mock.Anything).Return().Once() tc := helper.MakeTC(helper.WithTCView(s.livenessData.CurrentView+10), helper.WithTCNewestQC(s.livenessData.NewestQC)) s.notifier.On("OnViewChange", s.livenessData.CurrentView, tc.View+1).Once() diff --git a/consensus/hotstuff/timeoutaggregator/timeout_aggregator.go b/consensus/hotstuff/timeoutaggregator/timeout_aggregator.go index da6f2c277fc..ae308c42048 100644 --- a/consensus/hotstuff/timeoutaggregator/timeout_aggregator.go +++ b/consensus/hotstuff/timeoutaggregator/timeout_aggregator.go @@ -63,7 +63,7 @@ func NewTimeoutAggregator(log zerolog.Logger, } aggregator := &TimeoutAggregator{ - log: log.With().Str("component", "timeout_aggregator").Logger(), + log: log.With().Str("component", "hotstuff.timeout_aggregator").Logger(), hotstuffMetrics: hotstuffMetrics, engineMetrics: engineMetrics, notifier: notifier, diff --git a/consensus/hotstuff/timeoutcollector/factory.go b/consensus/hotstuff/timeoutcollector/factory.go index ad407b8e4ae..e76c441d1ec 100644 --- a/consensus/hotstuff/timeoutcollector/factory.go +++ b/consensus/hotstuff/timeoutcollector/factory.go @@ -3,12 +3,15 @@ package timeoutcollector import ( "fmt" + "github.com/rs/zerolog" + "github.com/onflow/flow-go/consensus/hotstuff" ) // TimeoutCollectorFactory implements hotstuff.TimeoutCollectorFactory, it is responsible for creating timeout collector // for given view. type TimeoutCollectorFactory struct { + log zerolog.Logger notifier hotstuff.Consumer collectorNotifier hotstuff.TimeoutCollectorConsumer processorFactory hotstuff.TimeoutProcessorFactory @@ -18,11 +21,13 @@ var _ hotstuff.TimeoutCollectorFactory = (*TimeoutCollectorFactory)(nil) // NewTimeoutCollectorFactory creates new instance of TimeoutCollectorFactory. // No error returns are expected during normal operations. -func NewTimeoutCollectorFactory(notifier hotstuff.Consumer, +func NewTimeoutCollectorFactory(log zerolog.Logger, + notifier hotstuff.Consumer, collectorNotifier hotstuff.TimeoutCollectorConsumer, createProcessor hotstuff.TimeoutProcessorFactory, ) *TimeoutCollectorFactory { return &TimeoutCollectorFactory{ + log: log, notifier: notifier, collectorNotifier: collectorNotifier, processorFactory: createProcessor, @@ -39,12 +44,13 @@ func (f *TimeoutCollectorFactory) Create(view uint64) (hotstuff.TimeoutCollector if err != nil { return nil, fmt.Errorf("could not create TimeoutProcessor at view %d: %w", view, err) } - return NewTimeoutCollector(view, f.notifier, f.collectorNotifier, processor), nil + return NewTimeoutCollector(f.log, view, f.notifier, f.collectorNotifier, processor), nil } // TimeoutProcessorFactory implements hotstuff.TimeoutProcessorFactory, it is responsible for creating timeout processor // for given view. type TimeoutProcessorFactory struct { + log zerolog.Logger committee hotstuff.Replicas notifier hotstuff.TimeoutCollectorConsumer validator hotstuff.Validator @@ -56,12 +62,14 @@ var _ hotstuff.TimeoutProcessorFactory = (*TimeoutProcessorFactory)(nil) // NewTimeoutProcessorFactory creates new instance of TimeoutProcessorFactory. // No error returns are expected during normal operations. func NewTimeoutProcessorFactory( + log zerolog.Logger, notifier hotstuff.TimeoutCollectorConsumer, committee hotstuff.Replicas, validator hotstuff.Validator, domainSeparationTag string, ) *TimeoutProcessorFactory { return &TimeoutProcessorFactory{ + log: log, committee: committee, notifier: notifier, validator: validator, @@ -85,5 +93,5 @@ func (f *TimeoutProcessorFactory) Create(view uint64) (hotstuff.TimeoutProcessor return nil, fmt.Errorf("could not create TimeoutSignatureAggregator at view %d: %w", view, err) } - return NewTimeoutProcessor(f.committee, f.validator, sigAggregator, f.notifier) + return NewTimeoutProcessor(f.log, f.committee, f.validator, sigAggregator, f.notifier) } diff --git a/consensus/hotstuff/timeoutcollector/timeout_collector.go b/consensus/hotstuff/timeoutcollector/timeout_collector.go index fb9563326aa..28a9dc6f2d6 100644 --- a/consensus/hotstuff/timeoutcollector/timeout_collector.go +++ b/consensus/hotstuff/timeoutcollector/timeout_collector.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" + "github.com/rs/zerolog" + "github.com/onflow/flow-go/consensus/hotstuff" "github.com/onflow/flow-go/consensus/hotstuff/model" "github.com/onflow/flow-go/engine/consensus/sealing/counters" @@ -14,6 +16,7 @@ import ( // their view is newer than any QC or TC previously known to the TimeoutCollector. // This module is safe to use in concurrent environment. type TimeoutCollector struct { + log zerolog.Logger notifier hotstuff.Consumer timeoutsCache *TimeoutObjectsCache // cache for tracking double timeout and timeout equivocation collectorNotifier hotstuff.TimeoutCollectorConsumer @@ -25,12 +28,17 @@ type TimeoutCollector struct { var _ hotstuff.TimeoutCollector = (*TimeoutCollector)(nil) // NewTimeoutCollector creates new instance of TimeoutCollector -func NewTimeoutCollector(view uint64, +func NewTimeoutCollector(log zerolog.Logger, + view uint64, notifier hotstuff.Consumer, collectorNotifier hotstuff.TimeoutCollectorConsumer, processor hotstuff.TimeoutProcessor, ) *TimeoutCollector { return &TimeoutCollector{ + log: log.With(). + Str("component", "hotstuff.timeout_collector"). + Uint64("view", view). + Logger(), notifier: notifier, timeoutsCache: NewTimeoutObjectsCache(view), processor: processor, @@ -76,13 +84,15 @@ func (c *TimeoutCollector) AddTimeout(timeout *model.TimeoutObject) error { func (c *TimeoutCollector) processTimeout(timeout *model.TimeoutObject) error { err := c.processor.Process(timeout) if err != nil { - if model.IsInvalidTimeoutError(err) { - c.notifier.OnInvalidTimeoutDetected(timeout) + if invalidTimeoutErr, ok := model.AsInvalidTimeoutError(err); ok { + c.notifier.OnInvalidTimeoutDetected(*invalidTimeoutErr) return nil } return fmt.Errorf("internal error while processing timeout: %w", err) } + c.notifier.OnTimeoutProcessed(timeout) + // In the following, we emit notifications about new QCs, if their view is newer than any QC previously // known to the TimeoutCollector. Note that our implementation only provides weak ordering: // * Over larger time scales, the emitted events are for statistically increasing views. diff --git a/consensus/hotstuff/timeoutcollector/timeout_collector_test.go b/consensus/hotstuff/timeoutcollector/timeout_collector_test.go index e36eae42e19..691209cb179 100644 --- a/consensus/hotstuff/timeoutcollector/timeout_collector_test.go +++ b/consensus/hotstuff/timeoutcollector/timeout_collector_test.go @@ -43,7 +43,7 @@ func (s *TimeoutCollectorTestSuite) SetupTest() { s.collectorNotifier.On("OnNewQcDiscovered", mock.Anything).Maybe() s.collectorNotifier.On("OnNewTcDiscovered", mock.Anything).Maybe() - s.collector = NewTimeoutCollector(s.view, s.notifier, s.collectorNotifier, s.processor) + s.collector = NewTimeoutCollector(unittest.Logger(), s.view, s.notifier, s.collectorNotifier, s.processor) } // TestView tests that `View` returns the same value that was passed in constructor @@ -60,6 +60,7 @@ func (s *TimeoutCollectorTestSuite) TestAddTimeout_HappyPath() { go func() { defer wg.Done() timeout := helper.TimeoutObjectFixture(helper.WithTimeoutObjectView(s.view)) + s.notifier.On("OnTimeoutProcessed", timeout).Once() s.processor.On("Process", timeout).Return(nil).Once() err := s.collector.AddTimeout(timeout) require.NoError(s.T(), err) @@ -74,6 +75,7 @@ func (s *TimeoutCollectorTestSuite) TestAddTimeout_HappyPath() { // double timeout to notifier which can be slashed later. func (s *TimeoutCollectorTestSuite) TestAddTimeout_DoubleTimeout() { timeout := helper.TimeoutObjectFixture(helper.WithTimeoutObjectView(s.view)) + s.notifier.On("OnTimeoutProcessed", timeout).Once() s.processor.On("Process", timeout).Return(nil).Once() err := s.collector.AddTimeout(timeout) require.NoError(s.T(), err) @@ -92,6 +94,7 @@ func (s *TimeoutCollectorTestSuite) TestAddTimeout_DoubleTimeout() { // TestAddTimeout_RepeatedTimeout checks that repeated timeouts are silently dropped without any errors. func (s *TimeoutCollectorTestSuite) TestAddTimeout_RepeatedTimeout() { timeout := helper.TimeoutObjectFixture(helper.WithTimeoutObjectView(s.view)) + s.notifier.On("OnTimeoutProcessed", timeout).Once() s.processor.On("Process", timeout).Return(nil).Once() err := s.collector.AddTimeout(timeout) require.NoError(s.T(), err) @@ -116,11 +119,14 @@ func (s *TimeoutCollectorTestSuite) TestAddTimeout_InvalidTimeout() { s.Run("invalid-timeout", func() { timeout := helper.TimeoutObjectFixture(helper.WithTimeoutObjectView(s.view)) s.processor.On("Process", timeout).Return(model.NewInvalidTimeoutErrorf(timeout, "")).Once() - s.notifier.On("OnInvalidTimeoutDetected", timeout).Once() + s.notifier.On("OnInvalidTimeoutDetected", mock.Anything).Run(func(args mock.Arguments) { + invalidTimeoutErr := args.Get(0).(model.InvalidTimeoutError) + require.Equal(s.T(), timeout, invalidTimeoutErr.Timeout) + }).Once() err := s.collector.AddTimeout(timeout) require.NoError(s.T(), err) - s.notifier.AssertCalled(s.T(), "OnInvalidTimeoutDetected", timeout) + s.notifier.AssertCalled(s.T(), "OnInvalidTimeoutDetected", mock.Anything) }) s.Run("process-exception", func() { exception := errors.New("invalid-signature") @@ -161,6 +167,7 @@ func (s *TimeoutCollectorTestSuite) TestAddTimeout_TONotifications() { timeout.LastViewTC = lastViewTC }) timeouts = append(timeouts, timeout) + s.notifier.On("OnTimeoutProcessed", timeout).Once() s.processor.On("Process", timeout).Return(nil).Once() } diff --git a/consensus/hotstuff/timeoutcollector/timeout_processor.go b/consensus/hotstuff/timeoutcollector/timeout_processor.go index 21e6bd0975d..60f0e785359 100644 --- a/consensus/hotstuff/timeoutcollector/timeout_processor.go +++ b/consensus/hotstuff/timeoutcollector/timeout_processor.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/rs/zerolog" "go.uber.org/atomic" "golang.org/x/exp/slices" @@ -41,6 +42,7 @@ func (t *accumulatedWeightTracker) Track(weight uint64) bool { // TimeoutProcessor will create a timeout certificate which can be used to advance round. // Concurrency safe. type TimeoutProcessor struct { + log zerolog.Logger view uint64 validator hotstuff.Validator committee hotstuff.Replicas @@ -58,7 +60,8 @@ var _ hotstuff.TimeoutProcessor = (*TimeoutProcessor)(nil) // - model.ErrViewForUnknownEpoch if no epoch containing the given view is known // // All other errors should be treated as exceptions. -func NewTimeoutProcessor(committee hotstuff.Replicas, +func NewTimeoutProcessor(log zerolog.Logger, + committee hotstuff.Replicas, validator hotstuff.Validator, sigAggregator hotstuff.TimeoutSignatureAggregator, notifier hotstuff.TimeoutCollectorConsumer, @@ -73,6 +76,10 @@ func NewTimeoutProcessor(committee hotstuff.Replicas, return nil, fmt.Errorf("could not retrieve timeout weight threshold for view %d: %w", view, err) } return &TimeoutProcessor{ + log: log.With(). + Str("component", "hotstuff.timeout_processor"). + Uint64("view", view). + Logger(), view: view, committee: committee, validator: validator, @@ -139,6 +146,7 @@ func (p *TimeoutProcessor) Process(timeout *model.TimeoutObject) error { // It does _not necessarily_ imply that the timeout is invalid or the sender is equivocating. return fmt.Errorf("adding signature to aggregator failed: %w", err) } + p.log.Debug().Msgf("processed timeout, total weight=(%d), required=(%d)", totalWeight, p.tcTracker.minRequiredWeight) if p.partialTCTracker.Track(totalWeight) { p.notifier.OnPartialTcCreated(p.view, p.newestQCTracker.NewestQC(), timeout.LastViewTC) diff --git a/consensus/hotstuff/timeoutcollector/timeout_processor_test.go b/consensus/hotstuff/timeoutcollector/timeout_processor_test.go index e313c1fdcab..b37188c5857 100644 --- a/consensus/hotstuff/timeoutcollector/timeout_processor_test.go +++ b/consensus/hotstuff/timeoutcollector/timeout_processor_test.go @@ -75,7 +75,9 @@ func (s *TimeoutProcessorTestSuite) SetupTest() { return s.totalWeight.Load() }).Maybe() - s.processor, err = NewTimeoutProcessor(s.committee, + s.processor, err = NewTimeoutProcessor( + unittest.Logger(), + s.committee, s.validator, s.sigAggregator, s.notifier, @@ -527,7 +529,7 @@ func TestTimeoutProcessor_BuildVerifyTC(t *testing.T) { notifier := mocks.NewTimeoutCollectorConsumer(t) notifier.On("OnPartialTcCreated", view, olderQC, (*flow.TimeoutCertificate)(nil)).Return().Once() notifier.On("OnTcConstructedFromTimeouts", mock.Anything).Run(onTCCreated).Return().Once() - processor, err := NewTimeoutProcessor(committee, validator, aggregator, notifier) + processor, err := NewTimeoutProcessor(unittest.Logger(), committee, validator, aggregator, notifier) require.NoError(t, err) // last view was successful, no lastViewTC in this case @@ -548,7 +550,7 @@ func TestTimeoutProcessor_BuildVerifyTC(t *testing.T) { notifier = mocks.NewTimeoutCollectorConsumer(t) notifier.On("OnPartialTcCreated", view+1, newestQC, (*flow.TimeoutCertificate)(nil)).Return().Once() notifier.On("OnTcConstructedFromTimeouts", mock.Anything).Run(onTCCreated).Return().Once() - processor, err = NewTimeoutProcessor(committee, validator, aggregator, notifier) + processor, err = NewTimeoutProcessor(unittest.Logger(), committee, validator, aggregator, notifier) require.NoError(t, err) // part of committee will use QC, another part TC, this will result in aggregated signature consisting diff --git a/consensus/hotstuff/validator/validator.go b/consensus/hotstuff/validator/validator.go index d995288883e..f52366ad540 100644 --- a/consensus/hotstuff/validator/validator.go +++ b/consensus/hotstuff/validator/validator.go @@ -348,8 +348,7 @@ func newInvalidTCError(tc *flow.TimeoutCertificate, err error) error { func newInvalidVoteError(vote *model.Vote, err error) error { return model.InvalidVoteError{ - VoteID: vote.ID(), - View: vote.View, - Err: err, + Vote: vote, + Err: err, } } diff --git a/consensus/hotstuff/voteaggregator/vote_aggregator.go b/consensus/hotstuff/voteaggregator/vote_aggregator.go index 3fc3eb68945..6f0063f0037 100644 --- a/consensus/hotstuff/voteaggregator/vote_aggregator.go +++ b/consensus/hotstuff/voteaggregator/vote_aggregator.go @@ -75,7 +75,7 @@ func NewVoteAggregator( } aggregator := &VoteAggregator{ - log: log, + log: log.With().Str("component", "hotstuff.vote_aggregator").Logger(), hotstuffMetrics: hotstuffMetrics, engineMetrics: engineMetrics, notifier: notifier, diff --git a/consensus/hotstuff/votecollector/combined_vote_processor_v2.go b/consensus/hotstuff/votecollector/combined_vote_processor_v2.go index 0c109c016cc..69d6fb350af 100644 --- a/consensus/hotstuff/votecollector/combined_vote_processor_v2.go +++ b/consensus/hotstuff/votecollector/combined_vote_processor_v2.go @@ -234,7 +234,9 @@ func (p *CombinedVoteProcessorV2) Process(vote *model.Vote) error { } // checking of conditions for building QC are satisfied - if p.stakingSigAggtor.TotalWeight() < p.minRequiredWeight { + totalWeight := p.stakingSigAggtor.TotalWeight() + p.log.Debug().Msgf("processed vote, total weight=(%d), required=(%d)", totalWeight, p.minRequiredWeight) + if totalWeight < p.minRequiredWeight { return nil } if !p.rbRector.EnoughShares() { @@ -258,7 +260,7 @@ func (p *CombinedVoteProcessorV2) Process(vote *model.Vote) error { p.log.Info(). Uint64("view", qc.View). Hex("signers", qc.SignerIndices). - Msg("new qc has been created") + Msg("new QC has been created") p.onQCCreated(qc) diff --git a/consensus/hotstuff/votecollector/combined_vote_processor_v3.go b/consensus/hotstuff/votecollector/combined_vote_processor_v3.go index b1d5d3d765a..1a2bdf72fee 100644 --- a/consensus/hotstuff/votecollector/combined_vote_processor_v3.go +++ b/consensus/hotstuff/votecollector/combined_vote_processor_v3.go @@ -258,7 +258,7 @@ func (p *CombinedVoteProcessorV3) Process(vote *model.Vote) error { p.log.Info(). Uint64("view", qc.View). Hex("signers", qc.SignerIndices). - Msg("new qc has been created") + Msg("new QC has been created") p.onQCCreated(qc) diff --git a/consensus/hotstuff/votecollector/staking_vote_processor.go b/consensus/hotstuff/votecollector/staking_vote_processor.go index 80ca2ea494f..a470d97bc67 100644 --- a/consensus/hotstuff/votecollector/staking_vote_processor.go +++ b/consensus/hotstuff/votecollector/staking_vote_processor.go @@ -60,7 +60,7 @@ func (f *stakingVoteProcessorFactoryBase) Create(log zerolog.Logger, block *mode } return &StakingVoteProcessor{ - log: log, + log: log.With().Hex("block_id", block.BlockID[:]).Logger(), block: block, stakingSigAggtor: stakingSigAggtor, onQCCreated: f.onQCCreated, @@ -139,6 +139,8 @@ func (p *StakingVoteProcessor) Process(vote *model.Vote) error { return fmt.Errorf("unexpected exception adding signature from vote %x to staking aggregator: %w", vote.ID(), err) } + p.log.Debug().Msgf("processed vote, total weight=(%d), required=(%d)", totalWeight, p.minRequiredWeight) + // checking of conditions for building QC are satisfied if totalWeight < p.minRequiredWeight { return nil @@ -153,6 +155,11 @@ func (p *StakingVoteProcessor) Process(vote *model.Vote) error { if err != nil { return fmt.Errorf("internal error constructing QC from votes: %w", err) } + + p.log.Info(). + Uint64("view", qc.View). + Hex("signers", qc.SignerIndices). + Msg("new QC has been created") p.onQCCreated(qc) return nil diff --git a/consensus/hotstuff/votecollector/statemachine.go b/consensus/hotstuff/votecollector/statemachine.go index cb1d5c85a80..6b7173196ab 100644 --- a/consensus/hotstuff/votecollector/statemachine.go +++ b/consensus/hotstuff/votecollector/statemachine.go @@ -63,7 +63,7 @@ func NewStateMachine( verifyingVoteProcessorFactory VerifyingVoteProcessorFactory, ) *VoteCollector { log = log.With(). - Str("hotstuff", "VoteCollector"). + Str("component", "hotstuff.vote_collector"). Uint64("view", view). Logger() sm := &VoteCollector{ @@ -130,14 +130,15 @@ func (m *VoteCollector) processVote(vote *model.Vote) error { currentState := processor.Status() err := processor.Process(vote) if err != nil { - if model.IsInvalidVoteError(err) { - m.notifier.OnInvalidVoteDetected(vote) + if invalidVoteErr, ok := model.AsInvalidVoteError(err); ok { + m.notifier.OnInvalidVoteDetected(*invalidVoteErr) return nil } // ATTENTION: due to how our logic is designed this situation is only possible // where we receive the same vote twice, this is not a case of double voting. // This scenario is possible if leader submits his vote additionally to the vote in proposal. if model.IsDuplicatedSignerError(err) { + m.log.Debug().Msgf("duplicated signer %x", vote.SignerID) return nil } return err @@ -147,6 +148,7 @@ func (m *VoteCollector) processVote(vote *model.Vote) error { continue } + m.notifier.OnVoteProcessed(vote) return nil } } @@ -274,7 +276,9 @@ func (m *VoteCollector) terminateVoteProcessing() { // processCachedVotes feeds all cached votes into the VoteProcessor func (m *VoteCollector) processCachedVotes(block *model.Block) { - for _, vote := range m.votesCache.All() { + cachedVotes := m.votesCache.All() + m.log.Info().Msgf("processing %d cached votes", len(cachedVotes)) + for _, vote := range cachedVotes { if vote.BlockID != block.BlockID { continue } diff --git a/consensus/hotstuff/votecollector/statemachine_test.go b/consensus/hotstuff/votecollector/statemachine_test.go index d31c7bdce49..8ad19e98903 100644 --- a/consensus/hotstuff/votecollector/statemachine_test.go +++ b/consensus/hotstuff/votecollector/statemachine_test.go @@ -8,6 +8,7 @@ import ( "github.com/gammazero/workerpool" "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -121,6 +122,7 @@ func (s *StateMachineTestSuite) TestAddVote_VerifyingState() { require.NoError(s.T(), err) s.T().Run("add-valid-vote", func(t *testing.T) { vote := unittest.VoteForBlockFixture(block) + s.notifier.On("OnVoteProcessed", vote).Once() processor.On("Process", vote).Return(nil).Once() err := s.collector.AddVote(vote) require.NoError(t, err) @@ -128,6 +130,7 @@ func (s *StateMachineTestSuite) TestAddVote_VerifyingState() { }) s.T().Run("add-double-vote", func(t *testing.T) { firstVote := unittest.VoteForBlockFixture(block) + s.notifier.On("OnVoteProcessed", firstVote).Once() processor.On("Process", firstVote).Return(nil).Once() err := s.collector.AddVote(firstVote) require.NoError(t, err) @@ -149,18 +152,22 @@ func (s *StateMachineTestSuite) TestAddVote_VerifyingState() { s.T().Run("add-invalid-vote", func(t *testing.T) { vote := unittest.VoteForBlockFixture(block, unittest.WithVoteView(s.view)) processor.On("Process", vote).Return(model.NewInvalidVoteErrorf(vote, "")).Once() - - s.notifier.On("OnInvalidVoteDetected", vote).Return(nil).Once() + s.notifier.On("OnVoteProcessed", vote).Once() + s.notifier.On("OnInvalidVoteDetected", mock.Anything).Run(func(args mock.Arguments) { + invalidVoteErr := args.Get(0).(model.InvalidVoteError) + require.Equal(s.T(), vote, invalidVoteErr.Vote) + }).Return(nil).Once() err := s.collector.AddVote(vote) // in case process returns model.InvalidVoteError we should silently ignore this error require.NoError(t, err) // but should get notified about invalid vote - s.notifier.AssertCalled(t, "OnInvalidVoteDetected", vote) + s.notifier.AssertCalled(t, "OnInvalidVoteDetected", mock.Anything) processor.AssertCalled(t, "Process", vote) }) s.T().Run("add-repeated-vote", func(t *testing.T) { vote := unittest.VoteForBlockFixture(block) + s.notifier.On("OnVoteProcessed", vote).Once() processor.On("Process", vote).Return(nil).Once() err := s.collector.AddVote(vote) require.NoError(t, err) @@ -202,7 +209,9 @@ func (s *StateMachineTestSuite) TestProcessBlock_ProcessingOfCachedVotes() { processor := s.prepareMockedProcessor(proposal) for i := 0; i < votes; i++ { vote := unittest.VoteForBlockFixture(block) - // eventually it has to be process by processor + // once when caching vote, and once when processing cached vote + s.notifier.On("OnVoteProcessed", vote).Twice() + // eventually it has to be processed by processor processor.On("Process", vote).Return(nil).Once() require.NoError(s.T(), s.collector.AddVote(vote)) } diff --git a/consensus/integration/nodes_test.go b/consensus/integration/nodes_test.go index b9fbf5c0777..b3f90233c4f 100644 --- a/consensus/integration/nodes_test.go +++ b/consensus/integration/nodes_test.go @@ -514,8 +514,19 @@ func createNode( timeoutCollectorDistributor := pubsub.NewTimeoutCollectorDistributor() timeoutCollectorDistributor.AddConsumer(logConsumer) - timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory(timeoutCollectorDistributor, committee, validator, msig.ConsensusTimeoutTag) - timeoutCollectorsFactory := timeoutcollector.NewTimeoutCollectorFactory(notifier, timeoutCollectorDistributor, timeoutProcessorFactory) + timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory( + log, + timeoutCollectorDistributor, + committee, + validator, + msig.ConsensusTimeoutTag, + ) + timeoutCollectorsFactory := timeoutcollector.NewTimeoutCollectorFactory( + log, + notifier, + timeoutCollectorDistributor, + timeoutProcessorFactory, + ) timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(log, livenessData.CurrentView, timeoutCollectorsFactory) timeoutAggregator, err := timeoutaggregator.NewTimeoutAggregator( diff --git a/engine/access/access_test.go b/engine/access/access_test.go index 77a61dd3f26..cf4c666b959 100644 --- a/engine/access/access_test.go +++ b/engine/access/access_test.go @@ -48,6 +48,7 @@ type Suite struct { state *protocol.State snapshot *protocol.Snapshot epochQuery *protocol.EpochQuery + params *protocol.Params signerIndicesDecoder *hsmock.BlockSignerDecoder signerIds flow.IdentifierList log zerolog.Logger @@ -56,6 +57,7 @@ type Suite struct { collClient *accessmock.AccessAPIClient execClient *accessmock.ExecutionAPIClient me *module.Local + rootBlock *flow.Header chainID flow.ChainID metrics *metrics.NoopCollector backend *backend.Backend @@ -78,10 +80,11 @@ func (suite *Suite) SetupTest() { suite.state.On("Final").Return(suite.snapshot, nil).Maybe() suite.snapshot.On("Epochs").Return(suite.epochQuery).Maybe() - header := unittest.BlockHeaderFixture() - params := new(protocol.Params) - params.On("Root").Return(header, nil) - suite.state.On("Params").Return(params).Maybe() + suite.rootBlock = unittest.BlockHeaderFixture(unittest.WithHeaderHeight(0)) + suite.params = new(protocol.Params) + suite.params.On("Root").Return(suite.rootBlock, nil) + suite.params.On("SporkRootBlockHeight").Return(suite.rootBlock.Height, nil) + suite.state.On("Params").Return(suite.params).Maybe() suite.collClient = new(accessmock.AccessAPIClient) suite.execClient = new(accessmock.ExecutionAPIClient) @@ -862,9 +865,10 @@ func (suite *Suite) createChain() (flow.Block, flow.Collection) { epoch.On("ClusterByChainID", mock.Anything).Return(cluster, nil) epochs := new(protocol.EpochQuery) epochs.On("Current").Return(epoch) - snap := new(protocol.Snapshot) - snap.On("Epochs").Return(epochs) - snap.On("SealingSegment").Return(&flow.SealingSegment{Blocks: unittest.BlockFixtures(2)}, nil).Maybe() + snap := protocol.NewSnapshot(suite.T()) + snap.On("Epochs").Return(epochs).Maybe() + snap.On("Params").Return(suite.params).Maybe() + snap.On("Head").Return(block.Header, nil).Maybe() suite.state.On("AtBlockID", mock.Anything).Return(snap).Once() // initial height lookup in ingestion engine suite.state.On("AtBlockID", refBlockID).Return(snap) diff --git a/engine/access/rpc/backend/backend_network.go b/engine/access/rpc/backend/backend_network.go index 78205f5d986..c3bdaf92c93 100644 --- a/engine/access/rpc/backend/backend_network.go +++ b/engine/access/rpc/backend/backend_network.go @@ -73,7 +73,7 @@ func (b *backendNetwork) getValidSnapshot(snapshot protocol.Snapshot, blocksVisi return nil, fmt.Errorf("failed to get counter and phase at highest block in the segment: %w", err) } - counterAtLowest, phaseAtLowest, err := b.getCounterAndPhase(segment.Lowest().Header.Height) + counterAtLowest, phaseAtLowest, err := b.getCounterAndPhase(segment.Sealed().Header.Height) if err != nil { return nil, fmt.Errorf("failed to get counter and phase at lowest block in the segment: %w", err) } diff --git a/engine/collection/epochmgr/factories/epoch.go b/engine/collection/epochmgr/factories/epoch.go index 8cf71ad4cd9..ca5bb9b03e4 100644 --- a/engine/collection/epochmgr/factories/epoch.go +++ b/engine/collection/epochmgr/factories/epoch.go @@ -141,6 +141,7 @@ func (factory *EpochComponentsFactory) Create( validator := hotstuffModules.Validator hotstuff, err = factory.hotstuff.Create( + cluster, state, metrics, builder, diff --git a/engine/collection/epochmgr/factories/hotstuff.go b/engine/collection/epochmgr/factories/hotstuff.go index b15f630426f..d100cd65df7 100644 --- a/engine/collection/epochmgr/factories/hotstuff.go +++ b/engine/collection/epochmgr/factories/hotstuff.go @@ -30,7 +30,7 @@ import ( type HotStuffMetricsFunc func(chainID flow.ChainID) module.HotstuffMetrics type HotStuffFactory struct { - log zerolog.Logger + baseLogger zerolog.Logger me module.Local db *badger.DB protoState protocol.State @@ -52,7 +52,7 @@ func NewHotStuffFactory( ) (*HotStuffFactory, error) { factory := &HotStuffFactory{ - log: log, + baseLogger: log, me: me, db: db, protoState: protoState, @@ -72,15 +72,15 @@ func (f *HotStuffFactory) CreateModules( payloads storage.ClusterPayloads, updater module.Finalizer, ) (*consensus.HotstuffModules, module.HotstuffMetrics, error) { - // setup metrics/logging with the new chain ID + log := f.createLogger(cluster) metrics := f.createMetrics(cluster.ChainID()) notifier := pubsub.NewDistributor() finalizationDistributor := pubsub.NewFinalizationDistributor() notifier.AddConsumer(finalizationDistributor) - notifier.AddConsumer(notifications.NewLogConsumer(f.log)) + notifier.AddConsumer(notifications.NewLogConsumer(log)) notifier.AddConsumer(hotmetrics.NewMetricsConsumer(metrics)) - notifier.AddConsumer(notifications.NewTelemetryConsumer(f.log, cluster.ChainID())) + notifier.AddConsumer(notifications.NewTelemetryConsumer(log)) var ( err error @@ -119,7 +119,7 @@ func (f *HotStuffFactory) CreateModules( validator := validatorImpl.NewMetricsWrapper(validatorImpl.New(committee, verifier), metrics) voteProcessorFactory := votecollector.NewStakingVoteProcessorFactory(committee, qcDistributor.OnQcConstructedFromVotes) voteAggregator, err := consensus.NewVoteAggregator( - f.log, + log, metrics, f.engineMetrics, f.mempoolMetrics, @@ -135,10 +135,10 @@ func (f *HotStuffFactory) CreateModules( } timeoutCollectorDistributor := pubsub.NewTimeoutCollectorDistributor() - timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory(timeoutCollectorDistributor, committee, validator, msig.CollectorTimeoutTag) + timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory(log, timeoutCollectorDistributor, committee, validator, msig.CollectorTimeoutTag) timeoutAggregator, err := consensus.NewTimeoutAggregator( - f.log, + log, metrics, f.engineMetrics, f.mempoolMetrics, @@ -167,6 +167,7 @@ func (f *HotStuffFactory) CreateModules( } func (f *HotStuffFactory) Create( + cluster protocol.Cluster, clusterState cluster.State, metrics module.HotstuffMetrics, builder module.Builder, @@ -182,8 +183,9 @@ func (f *HotStuffFactory) Create( return nil, err } + log := f.createLogger(cluster) participant, err := consensus.NewParticipant( - f.log, + log, metrics, builder, finalizedBlock, @@ -193,3 +195,8 @@ func (f *HotStuffFactory) Create( ) return participant, err } + +// createLogger creates a logger by wrapping base logger by decorating it will cluster ID +func (f *HotStuffFactory) createLogger(cluster protocol.Cluster) zerolog.Logger { + return f.baseLogger.With().Str("chain", cluster.ChainID().String()).Logger() +} diff --git a/integration/tests/epochs/epoch_join_and_leave_vn_test.go b/integration/tests/epochs/epoch_join_and_leave_vn_test.go index 4686a201f61..0bb79d160fb 100644 --- a/integration/tests/epochs/epoch_join_and_leave_vn_test.go +++ b/integration/tests/epochs/epoch_join_and_leave_vn_test.go @@ -6,12 +6,9 @@ import ( "github.com/stretchr/testify/suite" "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/utils/unittest" ) func TestEpochJoinAndLeaveVN(t *testing.T) { - // TODO this test is blocked by https://github.com/dapperlabs/flow-go/issues/6443 - unittest.SkipUnless(t, unittest.TEST_FLAKY, "this test is flaky due to an unhandled case in service event processing following epoch transition https://github.com/dapperlabs/flow-go/issues/6443") suite.Run(t, new(EpochJoinAndLeaveVNSuite)) } diff --git a/integration/tests/epochs/suite.go b/integration/tests/epochs/suite.go index 3ad2e73f715..18dcf7a4fbc 100644 --- a/integration/tests/epochs/suite.go +++ b/integration/tests/epochs/suite.go @@ -681,7 +681,7 @@ func (s *Suite) getLatestSealedHeader(ctx context.Context) *flow.Header { require.NoError(s.T(), err) segment, err := snapshot.SealingSegment() require.NoError(s.T(), err) - sealed := segment.Lowest() + sealed := segment.Sealed() return sealed.Header } @@ -818,8 +818,8 @@ func (s *Suite) runTestEpochJoinAndLeave(role flow.Role, checkNetworkHealth node s.TimedLogf("retrieved header after entering EpochSetup phase: root_height=%d, root_view=%d, segment_heights=[%d-%d], segment_views=[%d-%d]", header.Height, header.View, - segment.Lowest().Header.Height, segment.Highest().Header.Height, - segment.Lowest().Header.View, segment.Highest().Header.Height) + segment.Sealed().Header.Height, segment.Highest().Header.Height, + segment.Sealed().Header.View, segment.Highest().Header.Height) testContainer.WriteRootSnapshot(rootSnapshot) testContainer.Container.Start(s.ctx) diff --git a/model/flow/block.go b/model/flow/block.go index 711c09bc421..229a6059dcb 100644 --- a/model/flow/block.go +++ b/model/flow/block.go @@ -5,10 +5,7 @@ package flow func Genesis(chainID ChainID) *Block { // create the raw content for the genesis block - payload := Payload{ - Guarantees: nil, - Seals: nil, - } + payload := Payload{} // create the header header := Header{ diff --git a/model/flow/sealing_segment.go b/model/flow/sealing_segment.go index 5f4cab6c5de..48340a01e14 100644 --- a/model/flow/sealing_segment.go +++ b/model/flow/sealing_segment.go @@ -2,6 +2,8 @@ package flow import ( "fmt" + + "golang.org/x/exp/slices" ) // SealingSegment is the chain segment such that the last block (greatest height) @@ -65,6 +67,17 @@ type SealingSegment struct { // Blocks contain the chain segment blocks in ascending height order. Blocks []*Block + // ExtraBlocks [optional] holds ancestors of `Blocks` in ascending height order. These blocks + // are connecting to `Blocks[0]` (the lowest block of sealing segment). Formally, let `l` + // be the length of `ExtraBlocks`, then ExtraBlocks[l-1] is the _parent_ of `Blocks[0]`. + // These extra blocks are included in order to ensure that a newly bootstrapped node + // knows about all entities which might be referenced by blocks which extend from + // the sealing segment. + // ExtraBlocks are stored separately from Blocks, because not all node roles need + // the same amount of history. Blocks represents the minimal possible required history; + // ExtraBlocks represents additional role-required history. + ExtraBlocks []*Block + // ExecutionResults contain any results which are referenced by receipts // or seals in the sealing segment, but not included in any segment block // payloads. @@ -89,14 +102,22 @@ type SealingSegment struct { FirstSeal *Seal } +// Highest is the highest block in the sealing segment and the reference block from snapshot that was +// used to produce this sealing segment. func (segment *SealingSegment) Highest() *Block { return segment.Blocks[len(segment.Blocks)-1] } -func (segment *SealingSegment) Lowest() *Block { +// Sealed returns the most recently sealed block based on head of sealing segment(highest block). +func (segment *SealingSegment) Sealed() *Block { return segment.Blocks[0] } +// AllBlocks returns all blocks within the sealing segment, including extra blocks, in ascending height order. +func (segment *SealingSegment) AllBlocks() []*Block { + return append(segment.ExtraBlocks, segment.Blocks...) +} + // FinalizedSeal returns the seal that seals the lowest block. // Per specification, this seal must be included in a SealingSegment. // The SealingSegment must be validated. @@ -112,9 +133,9 @@ func (segment *SealingSegment) FinalizedSeal() (*Seal, error) { } // sanity check - if seal.BlockID != segment.Lowest().ID() { + if seal.BlockID != segment.Sealed().ID() { return nil, fmt.Errorf("finalized seal should seal the lowest block %v, but actually is to seal %v", - segment.Lowest().ID(), seal.BlockID) + segment.Sealed().ID(), seal.BlockID) } return seal, nil } @@ -168,6 +189,15 @@ func (segment *SealingSegment) Validate() error { return fmt.Errorf("invalid segment: %w", err) } } + // extra blocks should be added in reverse order, starting from the highest one since they are sorted + // in ascending order. + for i := len(segment.ExtraBlocks) - 1; i >= 0; i-- { + block := segment.ExtraBlocks[i] + err := builder.AddExtraBlock(block) + if err != nil { + return fmt.Errorf("invalid segment: %w", err) + } + } _, err := builder.SealingSegment() if err != nil { return fmt.Errorf("invalid segment: %w", err) @@ -204,11 +234,19 @@ type SealingSegmentBuilder struct { results []*ExecutionResult latestSeals map[Identifier]Identifier firstSeal *Seal + // extraBlocks included in sealing segment, must connect to the lowest block of segment + // stored in descending order for simpler population logic + extraBlocks []*Block } // AddBlock appends a block to the sealing segment under construction. // No errors are expected during normal operation. func (builder *SealingSegmentBuilder) AddBlock(block *Block) error { + // sanity check: all blocks have to be added before adding extra blocks + if len(builder.extraBlocks) > 0 { + return fmt.Errorf("cannot add sealing segment block after extra block is added") + } + // sanity check: block should be 1 height higher than current highest if !builder.isValidHeight(block) { return fmt.Errorf("invalid block height (%d): %w", block.Header.Height, ErrSegmentInvalidBlockHeight) @@ -274,6 +312,27 @@ func (builder *SealingSegmentBuilder) AddBlock(block *Block) error { return nil } +// AddExtraBlock appends an extra block to sealing segment under construction. +// Extra blocks needs to be added in descending order and the first block must connect to the lowest block +// of sealing segment, this way they form a continuous chain. +// No errors are expected during normal operation. +func (builder *SealingSegmentBuilder) AddExtraBlock(block *Block) error { + if len(builder.extraBlocks) == 0 { + if len(builder.blocks) == 0 { + return fmt.Errorf("cannot add extra blocks before adding lowest sealing segment block") + } + // first extra block has to match the lowest block of sealing segment + if (block.Header.Height + 1) != builder.lowest().Header.Height { + return fmt.Errorf("invalid extra block height (%d), doesn't connect to sealing segment: %w", block.Header.Height, ErrSegmentInvalidBlockHeight) + } + } else if (block.Header.Height + 1) != builder.extraBlocks[len(builder.extraBlocks)-1].Header.Height { + return fmt.Errorf("invalid extra block height (%d), doesn't connect to last extra block: %w", block.Header.Height, ErrSegmentInvalidBlockHeight) + } + + builder.extraBlocks = append(builder.extraBlocks, block) + return nil +} + // AddExecutionResult adds result to executionResults func (builder *SealingSegmentBuilder) addExecutionResult(result *ExecutionResult) { builder.results = append(builder.results, result) @@ -290,8 +349,15 @@ func (builder *SealingSegmentBuilder) SealingSegment() (*SealingSegment, error) return nil, fmt.Errorf("failed to validate sealing segment: %w", err) } + // SealingSegment must store extra blocks in ascending order, builder stores them in descending. + // Apply a sort to reverse the slice and use correct ordering. + slices.SortFunc(builder.extraBlocks, func(lhs, rhs *Block) bool { + return lhs.Header.Height < rhs.Header.Height + }) + return &SealingSegment{ Blocks: builder.blocks, + ExtraBlocks: builder.extraBlocks, ExecutionResults: builder.results, LatestSeals: builder.latestSeals, FirstSeal: builder.firstSeal, @@ -319,6 +385,9 @@ func (builder *SealingSegmentBuilder) validateRootSegment() error { if len(builder.blocks) == 0 { return fmt.Errorf("root segment must have at least 1 block") } + if len(builder.extraBlocks) > 0 { + return fmt.Errorf("root segment cannot have extra blocks") + } if builder.lowest().Header.View != 0 { return fmt.Errorf("root block has unexpected view (%d != 0)", builder.lowest().Header.View) } @@ -354,6 +423,12 @@ func (builder *SealingSegmentBuilder) validateSegment() error { return fmt.Errorf("expect at least 2 blocks in a sealing segment or 1 block in the case of root segments, but got an empty sealing segment: %w", ErrSegmentBlocksWrongLen) } + if len(builder.extraBlocks) > 0 { + if builder.extraBlocks[0].Header.Height+1 != builder.lowest().Header.Height { + return fmt.Errorf("extra blocks don't connect to lowest block in segment") + } + } + // if root sealing segment, use different validation if isRootSegment(builder.latestSeals) { err := builder.validateRootSegment() @@ -394,6 +469,7 @@ func NewSealingSegmentBuilder(resultLookup GetResultFunc, sealLookup GetSealByBl includedResults: make(map[Identifier]struct{}), latestSeals: make(map[Identifier]Identifier), blocks: make([]*Block, 0), + extraBlocks: make([]*Block, 0), results: make(ExecutionResultList, 0), } } diff --git a/model/flow/sealing_segment_test.go b/model/flow/sealing_segment_test.go index 00321775074..94034d25b41 100644 --- a/model/flow/sealing_segment_test.go +++ b/model/flow/sealing_segment_test.go @@ -270,7 +270,7 @@ func (suite *SealingSegmentSuite) TestBuild_RootSegment() { unittest.AssertEqualBlocksLenAndOrder(suite.T(), []*flow.Block{root}, segment.Blocks) require.Equal(suite.T(), segment.Highest().ID(), root.ID()) - require.Equal(suite.T(), segment.Lowest().ID(), root.ID()) + require.Equal(suite.T(), segment.Sealed().ID(), root.ID()) } // TestBuild_RootSegmentWrongView tests that we return ErrSegmentInvalidRootView for @@ -422,3 +422,80 @@ func TestAddBlock_StorageError(t *testing.T) { require.ErrorIs(t, err, flow.ErrSegmentSealLookup) }) } + +// TestAddExtraBlock tests different scenarios for adding extra blocks, covers happy and unhappy path scenarios. +func (suite *SealingSegmentSuite) TestAddExtraBlock() { + // populate sealing segment with one block + firstBlock := suite.FirstBlock() + firstBlock.Header.Height += 100 + suite.AddBlocks(firstBlock) + + suite.T().Run("empty-segment", func(t *testing.T) { + builder := flow.NewSealingSegmentBuilder(nil, nil) + block := unittest.BlockFixture() + err := builder.AddExtraBlock(&block) + require.Error(t, err) + }) + suite.T().Run("extra-block-does-not-connect", func(t *testing.T) { + // adding extra block that doesn't connect to the lowest is an error + extraBlock := unittest.BlockFixture() + extraBlock.Header.Height = firstBlock.Header.Height + 10 // make sure it doesn't connect by height + err := suite.builder.AddExtraBlock(&extraBlock) + require.ErrorIs(t, err, flow.ErrSegmentInvalidBlockHeight) + }) + suite.T().Run("extra-block-not-continuous", func(t *testing.T) { + builder := flow.NewSealingSegmentBuilder(suite.GetResult, suite.GetSealByBlockID) + err := builder.AddBlock(firstBlock) + require.NoError(t, err) + extraBlock := unittest.BlockFixture() + extraBlock.Header.Height = firstBlock.Header.Height - 1 // make it connect + err = builder.AddExtraBlock(&extraBlock) + require.NoError(t, err) + extraBlockWithSkip := unittest.BlockFixture() + extraBlockWithSkip.Header.Height = extraBlock.Header.Height - 2 // skip one height + err = builder.AddExtraBlock(&extraBlockWithSkip) + require.ErrorIs(t, err, flow.ErrSegmentInvalidBlockHeight) + }) + suite.T().Run("root-segment-extra-blocks", func(t *testing.T) { + builder := flow.NewSealingSegmentBuilder(suite.GetResult, suite.GetSealByBlockID) + err := builder.AddBlock(firstBlock) + require.NoError(t, err) + + extraBlock := unittest.BlockFixture() + extraBlock.Header.Height = firstBlock.Header.Height - 1 + err = builder.AddExtraBlock(&extraBlock) + require.NoError(t, err) + _, err = builder.SealingSegment() + // root segment cannot have extra blocks + require.Error(t, err) + }) + suite.T().Run("happy-path", func(t *testing.T) { + // add a few blocks with results and seals to form a valid sealing segment + // B1(S*) <- B2(R1) <- B3(S1) + + receipt, seal := unittest.ReceiptAndSealForBlock(firstBlock) + blockWithER := unittest.BlockWithParentFixture(firstBlock.Header) + blockWithER.SetPayload(unittest.PayloadFixture(unittest.WithReceipts(receipt))) + + // add one more block, with seal to the ER + highestBlock := unittest.BlockWithParentFixture(blockWithER.Header) + highestBlock.SetPayload(unittest.PayloadFixture(unittest.WithSeals(seal))) + + suite.AddBlocks(blockWithER, highestBlock) + + // construct two extra blocks that connect to the lowest block and add them to builder + // EB2 <- EB1 <- B1(S*) <- B2(R1) <- B3(S1) + extraBlock := unittest.BlockFixture() + extraBlock.Header.Height = firstBlock.Header.Height - 1 + err := suite.builder.AddExtraBlock(&extraBlock) + require.NoError(t, err) + secondExtraBlock := unittest.BlockFixture() + secondExtraBlock.Header.Height = extraBlock.Header.Height - 1 + err = suite.builder.AddExtraBlock(&secondExtraBlock) + require.NoError(t, err) + segment, err := suite.builder.SealingSegment() + require.NoError(t, err) + err = segment.Validate() + require.NoError(t, err) + }) +} diff --git a/module/metrics/hotstuff/consumer.go b/module/metrics/hotstuff/consumer.go index a37e8208636..0b5e0abf168 100644 --- a/module/metrics/hotstuff/consumer.go +++ b/module/metrics/hotstuff/consumer.go @@ -25,13 +25,13 @@ func NewMetricsConsumer(metrics module.HotstuffMetrics) *MetricsConsumer { } } -func (c *MetricsConsumer) OnQcTriggeredViewChange(qc *flow.QuorumCertificate, newView uint64) { +func (c *MetricsConsumer) OnQcTriggeredViewChange(_ uint64, newView uint64, qc *flow.QuorumCertificate) { c.metrics.SetCurView(newView) c.metrics.SetQCView(qc.View) c.metrics.CountSkipped() } -func (c *MetricsConsumer) OnTcTriggeredViewChange(tc *flow.TimeoutCertificate, newView uint64) { +func (c *MetricsConsumer) OnTcTriggeredViewChange(_ uint64, newView uint64, tc *flow.TimeoutCertificate) { c.metrics.SetCurView(newView) c.metrics.SetTCView(tc.View) c.metrics.CountTimeout() diff --git a/state/protocol/badger/config.go b/state/protocol/badger/config.go deleted file mode 100644 index e0708c699ce..00000000000 --- a/state/protocol/badger/config.go +++ /dev/null @@ -1,15 +0,0 @@ -package badger - -import ( - "github.com/onflow/flow-go/model/flow" -) - -type Config struct { - transactionExpiry uint64 // how many blocks after the reference block a transaction expires -} - -func DefaultConfig() Config { - return Config{ - transactionExpiry: flow.DefaultTransactionExpiry, - } -} diff --git a/state/protocol/badger/mutator.go b/state/protocol/badger/mutator.go index 3cb0755b8f9..b766bcf54c0 100644 --- a/state/protocol/badger/mutator.go +++ b/state/protocol/badger/mutator.go @@ -41,7 +41,6 @@ type FollowerState struct { tracer module.Tracer consumer protocol.Consumer blockTimer protocol.BlockTimer - cfg Config } // MutableState implements a mutable protocol state. When extending the @@ -69,7 +68,6 @@ func NewFollowerState( tracer: tracer, consumer: consumer, blockTimer: blockTimer, - cfg: DefaultConfig(), } return followerState, nil } @@ -265,13 +263,12 @@ func (m *MutableState) guaranteeExtend(ctx context.Context, candidate *flow.Bloc // we only look as far back for duplicates as the transaction expiry limit; // if a guarantee was included before that, we will disqualify it on the // basis of the reference block anyway - limit := header.Height - m.cfg.transactionExpiry + limit := header.Height - flow.DefaultTransactionExpiry if limit > header.Height { // overflow check limit = 0 } - - if limit < m.rootHeight { - limit = m.rootHeight + if limit < m.sporkRootBlockHeight { + limit = m.sporkRootBlockHeight } // build a list of all previously used guarantees on this part of the chain diff --git a/state/protocol/badger/params.go b/state/protocol/badger/params.go index 65f5d24612b..7f19d26234f 100644 --- a/state/protocol/badger/params.go +++ b/state/protocol/badger/params.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage/badger/operation" ) @@ -11,6 +12,8 @@ type Params struct { state *State } +var _ protocol.Params = (*Params)(nil) + func (p Params) ChainID() (flow.ChainID, error) { // retrieve root header @@ -33,6 +36,16 @@ func (p Params) SporkID() (flow.Identifier, error) { return sporkID, nil } +func (p Params) SporkRootBlockHeight() (uint64, error) { + var sporkRootBlockHeight uint64 + err := p.state.db.View(operation.RetrieveSporkRootBlockHeight(&sporkRootBlockHeight)) + if err != nil { + return 0, fmt.Errorf("could not get spork root block height: %w", err) + } + + return sporkRootBlockHeight, nil +} + func (p Params) ProtocolVersion() (uint, error) { var version uint diff --git a/state/protocol/badger/snapshot.go b/state/protocol/badger/snapshot.go index 4307c395aab..2ba7826d329 100644 --- a/state/protocol/badger/snapshot.go +++ b/state/protocol/badger/snapshot.go @@ -253,18 +253,36 @@ func (s *Snapshot) SealedResult() (*flow.ExecutionResult, *flow.Seal, error) { // receipt in the block's payload to make sure we have a corresponding execution result, any execution // results missing from blocks are stored in the SealingSegment.ExecutionResults field. func (s *Snapshot) SealingSegment() (*flow.SealingSegment, error) { - head, err := s.Head() - if err != nil { - return nil, fmt.Errorf("could not get snapshot reference block: %w", err) - } - if head.Height < s.state.rootHeight { + // Lets denote the highest block in the sealing segment `head` (initialized below). Formally, + // the sealing segment needs to contains enough history to satisfy _all_ of the following conditions: + // (i) The highest sealed block as of `head` needs to be included in the sealing segment. + // This is relevant if `head` does not contain any seals. + // (ii) All blocks that are sealed by `head`. This is relevant if head` contains _multiple_ seals. + // (iii) The sealing segment should contain the history back to (including): + // limitHeight := max(header.Height - flow.DefaultTransactionExpiry, SporkRootBlockHeight) + // + // Condition (i) and (ii) are necessary for the sealing segment for _any node_. In contrast, (iii) is + // necessary to bootstrap nodes that _validate_ block payloads (e.g. consensus nodes), to verify that + // collection guarantees are not duplicated (collections expire after `flow.DefaultTransactionExpiry` blocks). + // However, per convention, we include the blocks for (i) in the `SealingSegment.Blocks`, while the + // additional blocks for (ii) and optionally (iii) are contained in as `SealingSegment.ExtraBlocks`. + head, err := s.state.blocks.ByID(s.blockID) + if err != nil { + return nil, fmt.Errorf("could not get snapshot's reference block: %w", err) + } + if head.Header.Height < s.state.rootHeight { return nil, protocol.ErrSealingSegmentBelowRootBlock } + // STEP (i): highest sealed block as of `head` must be included. seal, err := s.state.seals.HighestInFork(s.blockID) if err != nil { return nil, fmt.Errorf("could not get seal for sealing segment: %w", err) } + blockSealedAtHead, err := s.state.headers.ByBlockID(seal.BlockID) + if err != nil { + return nil, fmt.Errorf("could not get block: %w", err) + } // walk through the chain backward until we reach the block referenced by // the latest seal - the returned segment includes this block @@ -283,12 +301,60 @@ func (s *Snapshot) SealingSegment() (*flow.SealingSegment, error) { return nil } - err = fork.TraverseForward(s.state.headers, s.blockID, scraper, fork.IncludingBlock(seal.BlockID)) if err != nil { return nil, fmt.Errorf("could not traverse sealing segment: %w", err) } + // STEP (ii): extend history down to the lowest block, whose seal is included in `head` + lowestSealedByHead := blockSealedAtHead + for _, sealInHead := range head.Payload.Seals { + h, e := s.state.headers.ByBlockID(sealInHead.BlockID) + if e != nil { + return nil, fmt.Errorf("could not get block (id=%x) for seal: %w", seal.BlockID, e) // storage.ErrNotFound or exception + } + if h.Height < lowestSealedByHead.Height { + lowestSealedByHead = h + } + } + + // STEP (iii): extended history to allow checking for duplicated collections, i.e. + // limitHeight = max(head.Height - flow.DefaultTransactionExpiry, SporkRootBlockHeight) + limitHeight := s.state.sporkRootBlockHeight + if head.Header.Height > s.state.sporkRootBlockHeight+flow.DefaultTransactionExpiry { + limitHeight = head.Header.Height - flow.DefaultTransactionExpiry + } + + // As we have to satisfy (ii) _and_ (iii), we have to take the longest history, i.e. the lowest height. + if lowestSealedByHead.Height < limitHeight { + limitHeight = lowestSealedByHead.Height + if limitHeight < s.state.sporkRootBlockHeight { // sanity check; should never happen + return nil, fmt.Errorf("unexpected internal error: calculated history-cutoff at height %d, which is lower than the spork's root height %d", limitHeight, s.state.sporkRootBlockHeight) + } + } + if limitHeight < blockSealedAtHead.Height { + // we need to include extra blocks in sealing segment + extraBlocksScraper := func(header *flow.Header) error { + blockID := header.ID() + block, err := s.state.blocks.ByID(blockID) + if err != nil { + return fmt.Errorf("could not get block: %w", err) + } + + err = builder.AddExtraBlock(block) + if err != nil { + return fmt.Errorf("could not add block to sealing segment: %w", err) + } + + return nil + } + + err = fork.TraverseBackward(s.state.headers, blockSealedAtHead.ParentID, extraBlocksScraper, fork.IncludingHeight(limitHeight)) + if err != nil { + return nil, fmt.Errorf("could not traverse extra blocks for sealing segment: %w", err) + } + } + segment, err := builder.SealingSegment() if err != nil { return nil, fmt.Errorf("could not build sealing segment: %w", err) diff --git a/state/protocol/badger/snapshot_test.go b/state/protocol/badger/snapshot_test.go index 829f6934424..be924988867 100644 --- a/state/protocol/badger/snapshot_test.go +++ b/state/protocol/badger/snapshot_test.go @@ -16,6 +16,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/model/flow/factory" "github.com/onflow/flow-go/model/flow/filter" + "github.com/onflow/flow-go/module/signature" "github.com/onflow/flow-go/state/protocol" bprotocol "github.com/onflow/flow-go/state/protocol/badger" "github.com/onflow/flow-go/state/protocol/inmem" @@ -228,6 +229,7 @@ func TestSealingSegment(t *testing.T) { assert.Len(t, actual.ExecutionResults, 1) assert.Len(t, actual.Blocks, 1) + assert.Empty(t, actual.ExtraBlocks) unittest.AssertEqualBlocksLenAndOrder(t, expected.Blocks, actual.Blocks) assertSealingSegmentBlocksQueryableAfterBootstrap(t, state.AtBlockID(head.ID())) @@ -237,7 +239,7 @@ func TestSealingSegment(t *testing.T) { // test sealing segment for non-root segment where the latest seal is the // root seal, but the segment contains more than the root block. // ROOT <- B1 - // Expected sealing segment: [ROOT, B1] + // Expected sealing segment: [ROOT, B1], extra blocks: [] t.Run("non-root with root seal as latest seal", func(t *testing.T) { util.RunWithFollowerProtocolState(t, rootSnapshot, func(db *badger.DB, state *bprotocol.FollowerState) { // build an extra block on top of root @@ -252,8 +254,9 @@ func TestSealingSegment(t *testing.T) { // sealing segment should contain B1 and B2 // B2 is reference of snapshot, B1 is latest sealed - unittest.AssertEqualBlocksLenAndOrder(t, []*flow.Block{rootSnapshot.Encodable().SealingSegment.Lowest(), block1}, segment.Blocks) + unittest.AssertEqualBlocksLenAndOrder(t, []*flow.Block{rootSnapshot.Encodable().SealingSegment.Sealed(), block1}, segment.Blocks) assert.Len(t, segment.ExecutionResults, 1) + assert.Empty(t, segment.ExtraBlocks) assertSealingSegmentBlocksQueryableAfterBootstrap(t, state.AtBlockID(block1.ID())) }) }) @@ -261,7 +264,7 @@ func TestSealingSegment(t *testing.T) { // test sealing segment for non-root segment with simple sealing structure // (no blocks in between reference block and latest sealed) // ROOT <- B1 <- B2(S1) - // Expected sealing segment: [B1, B2] + // Expected sealing segment: [B1, B2], extra blocks: [ROOT] t.Run("non-root", func(t *testing.T) { util.RunWithFollowerProtocolState(t, rootSnapshot, func(db *badger.DB, state *bprotocol.FollowerState) { // build a block to seal @@ -277,6 +280,9 @@ func TestSealingSegment(t *testing.T) { segment, err := state.AtBlockID(block2.ID()).SealingSegment() require.NoError(t, err) + require.Len(t, segment.ExtraBlocks, 1) + assert.Equal(t, segment.ExtraBlocks[0].Header.Height, head.Height) + // build a valid child B3 to ensure we have a QC buildBlock(t, state, unittest.BlockWithParentFixture(block2.Header)) @@ -291,7 +297,7 @@ func TestSealingSegment(t *testing.T) { // test sealing segment for sealing segment with a large number of blocks // between the reference block and latest sealed // ROOT <- B1 <- .... <- BN(S1) - // Expected sealing segment: [B1, ..., BN] + // Expected sealing segment: [B1, ..., BN], extra blocks: [ROOT] t.Run("long sealing segment", func(t *testing.T) { util.RunWithFollowerProtocolState(t, rootSnapshot, func(db *badger.DB, state *bprotocol.FollowerState) { @@ -322,6 +328,8 @@ func TestSealingSegment(t *testing.T) { assert.Len(t, segment.ExecutionResults, 1) // sealing segment should cover range [B1, BN] assert.Len(t, segment.Blocks, 102) + assert.Len(t, segment.ExtraBlocks, 1) + assert.Equal(t, segment.ExtraBlocks[0].Header.Height, head.Height) // first and last blocks should be B1, BN assert.Equal(t, block1.ID(), segment.Blocks[0].ID()) assert.Equal(t, blockN.ID(), segment.Blocks[101].ID()) @@ -332,7 +340,7 @@ func TestSealingSegment(t *testing.T) { // test sealing segment where the segment blocks contain seals for // ancestor blocks prior to the sealing segment // ROOT <- B1 <- B2(R1) <- B3 <- B4(R2, S1) <- B5 <- B6(S2) - // Expected sealing segment: [B2, B3, B4] + // Expected sealing segment: [B2, B3, B4], Extra blocks: [ROOT, B1] t.Run("overlapping sealing segment", func(t *testing.T) { util.RunWithFollowerProtocolState(t, rootSnapshot, func(db *badger.DB, state *bprotocol.FollowerState) { @@ -369,6 +377,7 @@ func TestSealingSegment(t *testing.T) { // sealing segment should be [B2, B3, B4, B5, B6] require.Len(t, segment.Blocks, 5) unittest.AssertEqualBlocksLenAndOrder(t, []*flow.Block{block2, block3, block4, block5, block6}, segment.Blocks) + unittest.AssertEqualBlocksLenAndOrder(t, []*flow.Block{block1}, segment.ExtraBlocks[1:]) require.Len(t, segment.ExecutionResults, 1) assertSealingSegmentBlocksQueryableAfterBootstrap(t, state.AtBlockID(block6.ID())) @@ -489,7 +498,7 @@ func TestSealingSegment(t *testing.T) { // Test the case where the reference block of the snapshot contains no seal. // We should consider the latest seal in a prior block. // ROOT <- B1 <- B2(R1) <- B3 <- B4(S1) <- B5 - // Expected sealing segment: [B1, B2, B3, B4, B5] + // Expected sealing segment: [B1, B2, B3, B4, B5], Extra blocks: [ROOT] t.Run("sealing segment where highest block in segment does not seal lowest", func(t *testing.T) { util.RunWithFollowerProtocolState(t, rootSnapshot, func(db *badger.DB, state *bprotocol.FollowerState) { // build a block to seal @@ -527,6 +536,133 @@ func TestSealingSegment(t *testing.T) { assertSealingSegmentBlocksQueryableAfterBootstrap(t, snapshot) }) }) + // Test the case where the reference block of the snapshot contains seals for blocks that are lower than the lowest sealing segment's block. + // This test case specifically checks if sealing segment includes both highest and lowest block sealed by head. + // ROOT <- B1 <- B2 <- B3(Seal_B1) <- B4 <- ... <- LastBlock(Seal_B2, Seal_B3, Seal_B4) + // Expected sealing segment: [B4, ..., B5], Extra blocks: [B2, B3] + t.Run("highest block seals outside segment", func(t *testing.T) { + util.RunWithFollowerProtocolState(t, rootSnapshot, func(db *badger.DB, state *bprotocol.FollowerState) { + + // build a block to seal + block1 := unittest.BlockWithParentFixture(head) + buildBlock(t, state, block1) + + // build a block sealing block1 + block2 := unittest.BlockWithParentFixture(block1.Header) + receipt1, seal1 := unittest.ReceiptAndSealForBlock(block1) + block2.SetPayload(unittest.PayloadFixture(unittest.WithReceipts(receipt1))) + buildBlock(t, state, block2) + + receipt2, seal2 := unittest.ReceiptAndSealForBlock(block2) + block3 := unittest.BlockWithParentFixture(block2.Header) + block3.SetPayload(unittest.PayloadFixture(unittest.WithSeals(seal1), unittest.WithReceipts(receipt2))) + buildBlock(t, state, block3) + + receipt3, seal3 := unittest.ReceiptAndSealForBlock(block3) + block4 := unittest.BlockWithParentFixture(block3.Header) + block4.SetPayload(unittest.PayloadFixture(unittest.WithReceipts(receipt3))) + buildBlock(t, state, block4) + + // build chain, so it's long enough to not target blocks as inside of flow.DefaultTransactionExpiry window. + parent := block4 + for i := 0; i < 1.5*flow.DefaultTransactionExpiry; i++ { + next := unittest.BlockWithParentFixture(parent.Header) + next.Header.View = next.Header.Height + 1 // set view so we are still in the same epoch + buildBlock(t, state, next) + parent = next + } + + receipt4, seal4 := unittest.ReceiptAndSealForBlock(block4) + lastBlock := unittest.BlockWithParentFixture(parent.Header) + lastBlock.SetPayload(unittest.PayloadFixture(unittest.WithSeals(seal2, seal3, seal4), unittest.WithReceipts(receipt4))) + buildBlock(t, state, lastBlock) + + snapshot := state.AtBlockID(lastBlock.ID()) + + // build a valid child to ensure we have a QC + buildBlock(t, state, unittest.BlockWithParentFixture(lastBlock.Header)) + + segment, err := snapshot.SealingSegment() + require.NoError(t, err) + assert.Equal(t, lastBlock.Header, segment.Highest().Header) + assert.Equal(t, block4.Header, segment.Sealed().Header) + unittest.AssertEqualBlocksLenAndOrder(t, []*flow.Block{block2, block3}, segment.ExtraBlocks) + assert.Len(t, segment.ExecutionResults, 2) + + assertSealingSegmentBlocksQueryableAfterBootstrap(t, snapshot) + }) + }) +} + +// TestBootstrapSealingSegmentWithExtraBlocks test sealing segment where the segment blocks contain collection +// guarantees referencing blocks prior to the sealing segment. After bootstrapping from sealing segment we should be able to +// extend with B7 with contains a guarantee referring B1. +// ROOT <- B1 <- B2(R1) <- B3 <- B4(S1) <- B5 <- B6(S2) +// Expected sealing segment: [B2, B3, B4, B5, B6], Extra blocks: [ROOT, B1] +func TestBootstrapSealingSegmentWithExtraBlocks(t *testing.T) { + identities := unittest.CompleteIdentitySet() + rootSnapshot := unittest.RootSnapshotFixture(identities) + rootEpoch := rootSnapshot.Epochs().Current() + cluster, err := rootEpoch.Cluster(0) + require.NoError(t, err) + collID := cluster.Members()[0].NodeID + head, err := rootSnapshot.Head() + require.NoError(t, err) + util.RunWithFullProtocolState(t, rootSnapshot, func(db *badger.DB, state *bprotocol.MutableState) { + block1 := unittest.BlockWithParentFixture(head) + buildBlock(t, state, block1) + receipt1, seal1 := unittest.ReceiptAndSealForBlock(block1) + + block2 := unittest.BlockWithParentFixture(block1.Header) + block2.SetPayload(unittest.PayloadFixture(unittest.WithReceipts(receipt1))) + buildBlock(t, state, block2) + + receipt2, seal2 := unittest.ReceiptAndSealForBlock(block2) + + block3 := unittest.BlockWithParentFixture(block2.Header) + buildBlock(t, state, block3) + + block4 := unittest.BlockWithParentFixture(block3.Header) + block4.SetPayload(unittest.PayloadFixture(unittest.WithReceipts(receipt2), unittest.WithSeals(seal1))) + buildBlock(t, state, block4) + + block5 := unittest.BlockWithParentFixture(block4.Header) + buildBlock(t, state, block5) + + block6 := unittest.BlockWithParentFixture(block5.Header) + block6.SetPayload(unittest.PayloadFixture(unittest.WithSeals(seal2))) + buildBlock(t, state, block6) + + snapshot := state.AtBlockID(block6.ID()) + segment, err := snapshot.SealingSegment() + require.NoError(t, err) + + // build a valid child to ensure we have a QC + buildBlock(t, state, unittest.BlockWithParentFixture(block6.Header)) + + // sealing segment should be [B2, B3, B4, B5, B6] + require.Len(t, segment.Blocks, 5) + unittest.AssertEqualBlocksLenAndOrder(t, []*flow.Block{block2, block3, block4, block5, block6}, segment.Blocks) + unittest.AssertEqualBlocksLenAndOrder(t, []*flow.Block{block1}, segment.ExtraBlocks[1:]) + require.Len(t, segment.ExecutionResults, 1) + + assertSealingSegmentBlocksQueryableAfterBootstrap(t, snapshot) + + // bootstrap from snapshot + util.RunWithFullProtocolState(t, snapshot, func(db *badger.DB, state *bprotocol.MutableState) { + block7 := unittest.BlockWithParentFixture(block6.Header) + guarantee := unittest.CollectionGuaranteeFixture(unittest.WithCollRef(block1.ID())) + guarantee.ChainID = cluster.ChainID() + + signerIndices, err := signature.EncodeSignersToIndices( + []flow.Identifier{collID}, []flow.Identifier{collID}) + require.NoError(t, err) + guarantee.SignerIndices = signerIndices + + block7.SetPayload(unittest.PayloadFixture(unittest.WithGuarantees(guarantee))) + buildBlock(t, state, block7) + }) + }) } func TestLatestSealedResult(t *testing.T) { diff --git a/state/protocol/badger/state.go b/state/protocol/badger/state.go index 69a9cc2ab54..58ba2ee1db0 100644 --- a/state/protocol/badger/state.go +++ b/state/protocol/badger/state.go @@ -33,6 +33,8 @@ type State struct { } // cache the root height because it cannot change over the lifecycle of a protocol state instance rootHeight uint64 + // cache the spork root block height because it cannot change over the lifecycle of a protocol state instance + sporkRootBlockHeight uint64 } type BootstrapConfig struct { @@ -96,7 +98,7 @@ func Bootstrap( // oldest ancestor and head is the newest child in the segment // TAIL <- ... <- HEAD highest := segment.Highest() // reference block of the snapshot - lowest := segment.Lowest() // last sealed block + lowest := segment.Sealed() // last sealed block // 1) bootstrap the sealing segment err = state.bootstrapSealingSegment(segment, highest)(tx) @@ -121,7 +123,7 @@ func Bootstrap( } // 4) initialize values related to the epoch logic - err = state.bootstrapEpoch(root, !config.SkipNetworkAddressValidation)(tx) + err = state.bootstrapEpoch(root.Epochs(), segment, !config.SkipNetworkAddressValidation)(tx) if err != nil { return fmt.Errorf("could not bootstrap epoch values: %w", err) } @@ -183,6 +185,19 @@ func (state *State) bootstrapSealingSegment(segment *flow.SealingSegment, head * } } + for _, block := range segment.ExtraBlocks { + blockID := block.ID() + height := block.Header.Height + err := state.blocks.StoreTx(block)(tx) + if err != nil { + return fmt.Errorf("could not insert root block: %w", err) + } + err = transaction.WithTx(operation.IndexBlockHeight(height, blockID))(tx) + if err != nil { + return fmt.Errorf("could not index root block segment (id=%x): %w", blockID, err) + } + } + for i, block := range segment.Blocks { blockID := block.ID() height := block.Header.Height @@ -240,7 +255,7 @@ func (state *State) bootstrapStatePointers(root protocol.Snapshot) func(*badger. return fmt.Errorf("could not get sealing segment: %w", err) } highest := segment.Highest() - lowest := segment.Lowest() + lowest := segment.Sealed() // find the finalized seal that seals the lowest block, meaning seal.BlockID == lowest.ID() seal, err := segment.FinalizedSeal() if err != nil { @@ -315,11 +330,11 @@ func (state *State) bootstrapStatePointers(root protocol.Snapshot) func(*badger. // // The root snapshot's sealing segment must not straddle any epoch transitions // or epoch phase transitions. -func (state *State) bootstrapEpoch(root protocol.Snapshot, verifyNetworkAddress bool) func(*transaction.Tx) error { +func (state *State) bootstrapEpoch(epochs protocol.EpochQuery, segment *flow.SealingSegment, verifyNetworkAddress bool) func(*transaction.Tx) error { return func(tx *transaction.Tx) error { - previous := root.Epochs().Previous() - current := root.Epochs().Current() - next := root.Epochs().Next() + previous := epochs.Previous() + current := epochs.Current() + next := epochs.Next() // build the status as we go status := new(flow.EpochStatus) @@ -430,11 +445,7 @@ func (state *State) bootstrapEpoch(root protocol.Snapshot, verifyNetworkAddress // NOTE: as specified in the godoc, this code assumes that each block // in the sealing segment in within the same phase within the same epoch. - segment, err := root.SealingSegment() - if err != nil { - return fmt.Errorf("could not get sealing segment: %w", err) - } - for _, block := range segment.Blocks { + for _, block := range segment.AllBlocks() { blockID := block.ID() err = state.epoch.statuses.StoreTx(blockID, status)(tx) if err != nil { @@ -461,6 +472,15 @@ func (state *State) bootstrapSporkInfo(root protocol.Snapshot) func(*badger.Txn) return fmt.Errorf("could not insert spork ID: %w", err) } + sporkRootBlockHeight, err := params.SporkRootBlockHeight() + if err != nil { + return fmt.Errorf("could not get spork root block height: %w", err) + } + err = operation.InsertSporkRootBlockHeight(sporkRootBlockHeight)(tx) + if err != nil { + return fmt.Errorf("could not insert spork root block height: %w", err) + } + version, err := params.ProtocolVersion() if err != nil { return fmt.Errorf("could not get protocol version: %w", err) @@ -683,6 +703,12 @@ func (state *State) populateCache() error { return fmt.Errorf("could not read root block to populate cache: %w", err) } state.rootHeight = rootHeight + + sporkRootBlockHeight, err := state.Params().SporkRootBlockHeight() + if err != nil { + return fmt.Errorf("could not read spork root block height: %w", err) + } + state.sporkRootBlockHeight = sporkRootBlockHeight return nil } diff --git a/state/protocol/badger/state_test.go b/state/protocol/badger/state_test.go index 7a6f0aeb9a4..32aff398735 100644 --- a/state/protocol/badger/state_test.go +++ b/state/protocol/badger/state_test.go @@ -442,7 +442,13 @@ func assertSealingSegmentBlocksQueryableAfterBootstrap(t *testing.T, snapshot pr require.NoError(t, err) segment, err := state.Final().SealingSegment() - assert.NoError(t, err) + require.NoError(t, err) + + rootBlock, err := state.Params().Root() + require.NoError(t, err) + + // root block should be the highest block from the sealing segment + assert.Equal(t, segment.Highest().Header, rootBlock) // for each block in the sealing segment we should be able to query: // * Head diff --git a/state/protocol/badger/validity.go b/state/protocol/badger/validity.go index 9a80046831e..e248c7057e8 100644 --- a/state/protocol/badger/validity.go +++ b/state/protocol/badger/validity.go @@ -220,7 +220,7 @@ func IsValidRootSnapshot(snap protocol.Snapshot, verifyResultID bool) error { } highest := segment.Highest() // reference block of the snapshot - lowest := segment.Lowest() // last sealed block + lowest := segment.Sealed() // last sealed block highestID := highest.ID() lowestID := lowest.ID() @@ -350,3 +350,54 @@ func validateClusterQC(cluster protocol.Cluster) error { } return nil } + +// SanityCheckConsensusNodeRootSnapshotValidity performs a sanity check to make sure root snapshot has enough history +// to participate in consensus, we require one of next conditions to pass: +// 1. IsSporkRootSnapshot() == true - this is a snapshot build from a first block of spork. +// 2. snapshot.SealingSegment().Len() >= transaction_expiry_limit - such snapshot +// has enough history to validate collection guarantees and can safely participate in consensus. +func SanityCheckConsensusNodeRootSnapshotValidity(snapshot protocol.Snapshot) error { + isSporkRootSnapshot, err := protocol.IsSporkRootSnapshot(snapshot) + if err != nil { + return fmt.Errorf("could not check if root snapshot is a spork root snapshot: %w", err) + } + // condition 1 satisfied + if isSporkRootSnapshot { + return nil + } + + // check condition 2 + head, err := snapshot.Head() + if err != nil { + return fmt.Errorf("could not query root snapshot head: %w", err) + } + + sporkRootBlockHeight, err := snapshot.Params().SporkRootBlockHeight() + if err != nil { + return fmt.Errorf("could not query spork root block height: %w", err) + } + + sealingSegment, err := snapshot.SealingSegment() + if err != nil { + return fmt.Errorf("could not query sealing segment: %w", err) + } + + sealingSegmentLength := uint64(len(sealingSegment.AllBlocks())) + transactionExpiry := uint64(flow.DefaultTransactionExpiry) + blocksInSpork := head.Height - sporkRootBlockHeight + // check if head.Height - sporkRootBlockHeight < flow.DefaultTransactionExpiry + // this is the case where we bootstrap early into the spork and there is simply not enough blocks + if blocksInSpork < transactionExpiry { + // the distance to spork root is less than transaction expiry, we need all blocks back to the spork root. + if sealingSegmentLength != blocksInSpork { + return fmt.Errorf("invalid root snapshot length, expecting exactly (%d), got (%d)", blocksInSpork, sealingSegmentLength) + } + } else { + // the distance to spork root is more than transaction expiry, we need at least `transactionExpiry` many blocks + if sealingSegmentLength < transactionExpiry { + return fmt.Errorf("invalid root snapshot length, expecting at least (%d), got (%d)", + transactionExpiry, sealingSegmentLength) + } + } + return nil +} diff --git a/state/protocol/badger/validity_test.go b/state/protocol/badger/validity_test.go index 60c278247b4..7dcf8390a5b 100644 --- a/state/protocol/badger/validity_test.go +++ b/state/protocol/badger/validity_test.go @@ -101,3 +101,46 @@ func TestBootstrapInvalidEpochCommit(t *testing.T) { require.Error(t, err) }) } + +// TestBootstrapInvalidConsensusRootSnapshot tests that we perform correct sanity checks when bootstrapping consensus nodes +// we expect that we only bootstrap snapshots with sufficient history. +func TestBootstrapConsensusRootSnapshot(t *testing.T) { + t.Run("spork-root-snapshot", func(t *testing.T) { + rootSnapshot := unittest.RootSnapshotFixture(participants) + err := SanityCheckConsensusNodeRootSnapshotValidity(rootSnapshot) + require.NoError(t, err) + }) + t.Run("not-enough-history", func(t *testing.T) { + rootSnapshot := unittest.RootSnapshotFixture(participants) + rootSnapshot.Encodable().Head.Height += 10 // advance height to be not spork root snapshot + err := SanityCheckConsensusNodeRootSnapshotValidity(rootSnapshot) + require.Error(t, err) + }) + t.Run("enough-history-spork-just-started", func(t *testing.T) { + rootSnapshot := unittest.RootSnapshotFixture(participants) + // advance height to be not spork root snapshot, but still lower than transaction expiry + rootSnapshot.Encodable().Head.Height += flow.DefaultTransactionExpiry / 2 + // add blocks to sealing segment + rootSnapshot.Encodable().SealingSegment.ExtraBlocks = unittest.BlockFixtures(int(flow.DefaultTransactionExpiry/2) - 1) + err := SanityCheckConsensusNodeRootSnapshotValidity(rootSnapshot) + require.NoError(t, err) + }) + t.Run("enough-history-long-spork", func(t *testing.T) { + rootSnapshot := unittest.RootSnapshotFixture(participants) + // advance height to be not spork root snapshot + rootSnapshot.Encodable().Head.Height += flow.DefaultTransactionExpiry * 2 + // add blocks to sealing segment + rootSnapshot.Encodable().SealingSegment.ExtraBlocks = unittest.BlockFixtures(int(flow.DefaultTransactionExpiry) - 1) + err := SanityCheckConsensusNodeRootSnapshotValidity(rootSnapshot) + require.NoError(t, err) + }) + t.Run("more-history-than-needed", func(t *testing.T) { + rootSnapshot := unittest.RootSnapshotFixture(participants) + // advance height to be not spork root snapshot + rootSnapshot.Encodable().Head.Height += flow.DefaultTransactionExpiry * 2 + // add blocks to sealing segment + rootSnapshot.Encodable().SealingSegment.ExtraBlocks = unittest.BlockFixtures(flow.DefaultTransactionExpiry * 2) + err := SanityCheckConsensusNodeRootSnapshotValidity(rootSnapshot) + require.NoError(t, err) + }) +} diff --git a/state/protocol/inmem/convert.go b/state/protocol/inmem/convert.go index f75e595fe72..cf26b1d99b0 100644 --- a/state/protocol/inmem/convert.go +++ b/state/protocol/inmem/convert.go @@ -16,7 +16,6 @@ import ( // one that can easily be serialized to disk or to network. // TODO error docs func FromSnapshot(from protocol.Snapshot) (*Snapshot, error) { - var ( snap EncodableSnapshot err error @@ -89,7 +88,6 @@ func FromSnapshot(from protocol.Snapshot) (*Snapshot, error) { // FromParams converts any protocol.GlobalParams to a memory-backed Params. // TODO error docs func FromParams(from protocol.GlobalParams) (*Params, error) { - var ( params EncodableParams err error @@ -103,10 +101,18 @@ func FromParams(from protocol.GlobalParams) (*Params, error) { if err != nil { return nil, fmt.Errorf("could not get spork id: %w", err) } + params.SporkRootBlockHeight, err = from.SporkRootBlockHeight() + if err != nil { + return nil, fmt.Errorf("could not get spork root block height: %w", err) + } params.ProtocolVersion, err = from.ProtocolVersion() if err != nil { return nil, fmt.Errorf("could not get protocol version: %w", err) } + params.EpochCommitSafetyThreshold, err = from.EpochCommitSafetyThreshold() + if err != nil { + return nil, fmt.Errorf("could not get protocol version: %w", err) + } return &Params{params}, nil } @@ -117,7 +123,6 @@ func FromParams(from protocol.GlobalParams) (*Params, error) { // * protocol.ErrNextEpochNotSetup - if the epoch represents a next epoch which has not been set up. // * state.ErrUnknownSnapshotReference - if the epoch is queried from an unresolvable snapshot. func FromEpoch(from protocol.Epoch) (*Epoch, error) { - var ( epoch EncodableEpoch err error @@ -248,7 +253,6 @@ func SnapshotFromBootstrapStateWithParams( protocolVersion uint, epochCommitSafetyThreshold uint64, ) (*Snapshot, error) { - setup, ok := result.ServiceEvents[0].Event.(*flow.EpochSetup) if !ok { return nil, fmt.Errorf("invalid setup event type (%T)", result.ServiceEvents[0].Event) @@ -290,6 +294,7 @@ func SnapshotFromBootstrapStateWithParams( params := EncodableParams{ ChainID: root.Header.ChainID, // chain ID must match the root block SporkID: root.ID(), // use root block ID as the unique spork identifier + SporkRootBlockHeight: root.Header.Height, // use root block height as the spork root block height ProtocolVersion: protocolVersion, // major software version for this spork EpochCommitSafetyThreshold: epochCommitSafetyThreshold, // see protocol.Params for details } @@ -304,6 +309,7 @@ func SnapshotFromBootstrapStateWithParams( ExecutionResults: flow.ExecutionResultList{result}, LatestSeals: map[flow.Identifier]flow.Identifier{root.ID(): seal.ID()}, FirstSeal: seal, + ExtraBlocks: make([]*flow.Block, 0), }, QuorumCertificate: qc, Phase: flow.EpochPhaseStaking, diff --git a/state/protocol/inmem/encodable.go b/state/protocol/inmem/encodable.go index 48599c00b39..c52d85ae3e0 100644 --- a/state/protocol/inmem/encodable.go +++ b/state/protocol/inmem/encodable.go @@ -66,6 +66,7 @@ type EncodableCluster struct { type EncodableParams struct { ChainID flow.ChainID SporkID flow.Identifier + SporkRootBlockHeight uint64 ProtocolVersion uint EpochCommitSafetyThreshold uint64 } diff --git a/state/protocol/inmem/params.go b/state/protocol/inmem/params.go index 4bc380add9d..64e00eb4eda 100644 --- a/state/protocol/inmem/params.go +++ b/state/protocol/inmem/params.go @@ -16,6 +16,10 @@ func (p Params) SporkID() (flow.Identifier, error) { return p.enc.SporkID, nil } +func (p Params) SporkRootBlockHeight() (uint64, error) { + return p.enc.SporkRootBlockHeight, nil +} + func (p Params) ProtocolVersion() (uint, error) { return p.enc.ProtocolVersion, nil } diff --git a/state/protocol/invalid/params.go b/state/protocol/invalid/params.go index 934d6cd84fb..a131d4517a8 100644 --- a/state/protocol/invalid/params.go +++ b/state/protocol/invalid/params.go @@ -17,6 +17,10 @@ func (p Params) SporkID() (flow.Identifier, error) { return flow.ZeroID, p.err } +func (p Params) SporkRootBlockHeight() (uint64, error) { + return 0, p.err +} + func (p Params) ProtocolVersion() (uint, error) { return 0, p.err } diff --git a/state/protocol/mock/global_params.go b/state/protocol/mock/global_params.go index 1cdab3f3684..4ecf14ed03f 100644 --- a/state/protocol/mock/global_params.go +++ b/state/protocol/mock/global_params.go @@ -98,6 +98,27 @@ func (_m *GlobalParams) SporkID() (flow.Identifier, error) { return r0, r1 } +// SporkRootBlockHeight provides a mock function with given fields: +func (_m *GlobalParams) SporkRootBlockHeight() (uint64, error) { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type mockConstructorTestingTNewGlobalParams interface { mock.TestingT Cleanup(func()) diff --git a/state/protocol/mock/params.go b/state/protocol/mock/params.go index 13b2c25a2fe..000140f5d42 100644 --- a/state/protocol/mock/params.go +++ b/state/protocol/mock/params.go @@ -165,6 +165,27 @@ func (_m *Params) SporkID() (flow.Identifier, error) { return r0, r1 } +// SporkRootBlockHeight provides a mock function with given fields: +func (_m *Params) SporkRootBlockHeight() (uint64, error) { + ret := _m.Called() + + var r0 uint64 + if rf, ok := ret.Get(0).(func() uint64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(uint64) + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + type mockConstructorTestingTNewParams interface { mock.TestingT Cleanup(func()) diff --git a/state/protocol/params.go b/state/protocol/params.go index 6012b6a5dca..2c65ae73690 100644 --- a/state/protocol/params.go +++ b/state/protocol/params.go @@ -53,6 +53,13 @@ type GlobalParams interface { // No errors are expected during normal operation. SporkID() (flow.Identifier, error) + // SporkRootBlockHeight returns the height of the spork's root block. + // This value is determined at the beginning of a spork during bootstrapping. + // If node uses a sealing segment for bootstrapping then this value will be carried over + // as part of snapshot. + // No errors are expected during normal operation. + SporkRootBlockHeight() (uint64, error) + // ProtocolVersion returns the protocol version, the major software version // of the protocol software. // No errors are expected during normal operation. diff --git a/state/protocol/util.go b/state/protocol/util.go index d825bf7db92..7807a55edee 100644 --- a/state/protocol/util.go +++ b/state/protocol/util.go @@ -65,15 +65,15 @@ func CheckNodeStatusAt(snapshot Snapshot, id flow.Identifier, checks ...flow.Ide // IsSporkRootSnapshot returns whether the given snapshot is the state snapshot // representing the initial state for a spork. func IsSporkRootSnapshot(snapshot Snapshot) (bool, error) { - segment, err := snapshot.SealingSegment() + sporkRootBlockHeight, err := snapshot.Params().SporkRootBlockHeight() if err != nil { - return false, fmt.Errorf("could not get snapshot head: %w", err) + return false, fmt.Errorf("could not get snapshot root block height: %w", err) } - if len(segment.Blocks) > 1 { - // spork root snapshots uniquely have only one block in the sealing segment - return false, nil + head, err := snapshot.Head() + if err != nil { + return false, fmt.Errorf("could not get snapshot head: %w", err) } - return true, nil + return head.Height == sporkRootBlockHeight, nil } // PreviousEpochExists returns whether the previous epoch exists w.r.t. the given diff --git a/state/protocol/util_test.go b/state/protocol/util_test.go index d63f1b78029..7858f5767b7 100644 --- a/state/protocol/util_test.go +++ b/state/protocol/util_test.go @@ -26,7 +26,7 @@ func TestIsSporkRootSnapshot(t *testing.T) { t.Run("other snapshot", func(t *testing.T) { snapshot := unittest.RootSnapshotFixture(unittest.IdentityListFixture(10, unittest.WithAllRoles())) - snapshot.Encodable().SealingSegment.Blocks = unittest.BlockFixtures(5) + snapshot.Encodable().Head.Height += 1 // modify head height to break equivalence with spork root block height isSporkRoot, err := protocol.IsSporkRootSnapshot(snapshot) require.NoError(t, err) assert.False(t, isSporkRoot) diff --git a/storage/badger/operation/prefix.go b/storage/badger/operation/prefix.go index 1ce44a51d7d..400f475b2c6 100644 --- a/storage/badger/operation/prefix.go +++ b/storage/badger/operation/prefix.go @@ -24,13 +24,14 @@ const ( codeSporkID = 13 codeProtocolVersion = 14 codeEpochCommitSafetyThreshold = 15 + codeSporkRootBlockHeight = 16 // code for heights with special meaning codeFinalizedHeight = 20 // latest finalized block height codeSealedHeight = 21 // latest sealed block height codeClusterHeight = 22 // latest finalized height on cluster codeExecutedBlock = 23 // latest executed block with max height - codeRootHeight = 24 // the height of the first loaded block + codeRootHeight = 24 // the height of the highest block contained in the root snapshot codeLastCompleteBlockHeight = 25 // the height of the last block for which all collections were received // codes for single entity storage diff --git a/storage/badger/operation/spork.go b/storage/badger/operation/spork.go index b85e3e904ae..9f80afcddf9 100644 --- a/storage/badger/operation/spork.go +++ b/storage/badger/operation/spork.go @@ -18,6 +18,18 @@ func RetrieveSporkID(sporkID *flow.Identifier) func(*badger.Txn) error { return retrieve(makePrefix(codeSporkID), sporkID) } +// InsertSporkRootBlockHeight inserts the spork root block height for the present spork. +// A single database and protocol state instance spans at most one spork, so this is inserted +// exactly once, when bootstrapping the state. +func InsertSporkRootBlockHeight(height uint64) func(*badger.Txn) error { + return insert(makePrefix(codeSporkRootBlockHeight), height) +} + +// RetrieveSporkRootBlockHeight retrieves the spork root block height for the present spork. +func RetrieveSporkRootBlockHeight(height *uint64) func(*badger.Txn) error { + return retrieve(makePrefix(codeSporkRootBlockHeight), height) +} + // InsertProtocolVersion inserts the protocol version for the present spork. // A single database and protocol state instance spans at most one spork, and // a spork has exactly one protocol version for its duration, so this is diff --git a/utils/unittest/mocks/protocol_state.go b/utils/unittest/mocks/protocol_state.go index 615e8e39290..7eef54109fb 100644 --- a/utils/unittest/mocks/protocol_state.go +++ b/utils/unittest/mocks/protocol_state.go @@ -50,6 +50,10 @@ func (p *Params) SporkID() (flow.Identifier, error) { return flow.ZeroID, fmt.Errorf("not implemented") } +func (p *Params) SporkRootBlockHeight() (uint64, error) { + return 0, fmt.Errorf("not implemented") +} + func (p *Params) ProtocolVersion() (uint, error) { return 0, fmt.Errorf("not implemented") }