Skip to content

Commit

Permalink
Merge pull request #3822 from onflow/feature/active-pacemaker
Browse files Browse the repository at this point in the history
Sync Pacemaker Branch to `v0.29`
  • Loading branch information
jordanschalm authored Jan 17, 2023
2 parents 3eabd4d + 3a52f2e commit 7f02a64
Show file tree
Hide file tree
Showing 62 changed files with 1,047 additions and 209 deletions.
29 changes: 19 additions & 10 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func main() {

nodeBuilder.
PreInit(cmd.DynamicStartPreInit).
ValidateRootSnapshot(badgerState.SanityCheckConsensusNodeRootSnapshotValidity).
Module("consensus node metrics", func(node *cmd.NodeConfig) error {
conMetrics = metrics.NewConsensusCollector(node.Tracer, node.MetricsRegisterer)
return nil
Expand Down Expand Up @@ -546,12 +547,13 @@ func main() {
)
signer = verification.NewMetricsWrapper(signer, mainMetrics) // wrapper for measuring time spent with crypto-related operations

// create consensus logger
logger := createLogger(node.Logger, node.RootChainID)

// initialize a logging notifier for hotstuff
notifier := createNotifier(
node.Logger,
logger,
mainMetrics,
node.Tracer,
node.RootChainID,
)

notifier.AddConsumer(finalizationDistributor)
Expand Down Expand Up @@ -581,7 +583,7 @@ func main() {
voteProcessorFactory := votecollector.NewCombinedVoteProcessorFactory(wrappedCommittee, qcDistributor.OnQcConstructedFromVotes)
lowestViewForVoteProcessing := finalizedBlock.View + 1
voteAggregator, err := consensus.NewVoteAggregator(
node.Logger,
logger,
mainMetrics,
node.Metrics.Engine,
node.Metrics.Mempool,
Expand All @@ -594,9 +596,15 @@ func main() {
}

timeoutCollectorDistributor := pubsub.NewTimeoutCollectorDistributor()
timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory(timeoutCollectorDistributor, committee, validator, msig.ConsensusTimeoutTag)
timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory(
logger,
timeoutCollectorDistributor,
committee,
validator,
msig.ConsensusTimeoutTag,
)
timeoutAggregator, err := consensus.NewTimeoutAggregator(
node.Logger,
logger,
mainMetrics,
node.Metrics.Engine,
node.Metrics.Mempool,
Expand Down Expand Up @@ -669,7 +677,7 @@ func main() {

// initialize hotstuff consensus algorithm
hot, err = consensus.NewParticipant(
node.Logger,
createLogger(node.Logger, node.RootChainID),
mainMetrics,
build,
finalizedBlock,
Expand All @@ -689,7 +697,8 @@ func main() {
// initialize the pending blocks cache
proposals := buffer.NewPendingBlocks()

complianceCore, err := compliance.NewCore(node.Logger,
logger := createLogger(node.Logger, node.RootChainID)
complianceCore, err := compliance.NewCore(logger,
node.Metrics.Engine,
node.Metrics.Mempool,
mainMetrics,
Expand All @@ -713,7 +722,7 @@ func main() {

// initialize the compliance engine
comp, err = compliance.NewEngine(
node.Logger,
logger,
node.Me,
complianceCore,
)
Expand All @@ -727,7 +736,7 @@ func main() {
}).
Component("consensus message hub", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
messageHub, err := message_hub.NewMessageHub(
node.Logger,
createLogger(node.Logger, node.RootChainID),
node.Metrics.Engine,
node.Network,
node.Me,
Expand Down
11 changes: 8 additions & 3 deletions cmd/consensus/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ import (
metricsconsumer "github.com/onflow/flow-go/module/metrics/hotstuff"
)

func createNotifier(log zerolog.Logger, metrics module.HotstuffMetrics, tracer module.Tracer, chain flow.ChainID,
) *pubsub.Distributor {
telemetryConsumer := notifications.NewTelemetryConsumer(log, chain)
// createLogger creates logger which reports chain ID on every log message.
func createLogger(log zerolog.Logger, chainID flow.ChainID) zerolog.Logger {
return log.With().Str("chain", chainID.String()).Logger()
}

// createNotifier creates a pubsub distributor and connects it to consensus consumers.
func createNotifier(log zerolog.Logger, metrics module.HotstuffMetrics) *pubsub.Distributor {
telemetryConsumer := notifications.NewTelemetryConsumer(log)
metricsConsumer := metricsconsumer.NewMetricsConsumer(metrics)
logsConsumer := notifications.NewLogConsumer(log)
dis := pubsub.NewDistributor()
Expand Down
4 changes: 4 additions & 0 deletions cmd/node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ type NodeBuilder interface {
// ValidateFlags sets any custom validation rules for the command line flags,
// for example where certain combinations aren't allowed
ValidateFlags(func() error) NodeBuilder

// ValidateRootSnapshot sets any custom validation rules for the root snapshot.
// This check is executed after other checks but before applying any data from root snapshot.
ValidateRootSnapshot(f func(protocol.Snapshot) error) NodeBuilder
}

// BaseConfig is the general config for the NodeBuilder and the command line params
Expand Down
16 changes: 16 additions & 0 deletions cmd/scaffold.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,15 @@ type FlowNodeBuilder struct {
postShutdownFns []func() error
preInitFns []BuilderFunc
postInitFns []BuilderFunc
extraRootSnapshotCheck func(protocol.Snapshot) error
extraFlagCheck func() error
adminCommandBootstrapper *admin.CommandRunnerBootstrapper
adminCommands map[string]func(config *NodeConfig) commands.AdminCommand
componentBuilder component.ComponentManagerBuilder
}

var _ NodeBuilder = (*FlowNodeBuilder)(nil)

func (fnb *FlowNodeBuilder) BaseFlags() {
defaultConfig := DefaultBaseConfig()

Expand Down Expand Up @@ -550,6 +553,11 @@ func (fnb *FlowNodeBuilder) ParseAndPrintFlags() error {
return fnb.extraFlagsValidation()
}

func (fnb *FlowNodeBuilder) ValidateRootSnapshot(f func(protocol.Snapshot) error) NodeBuilder {
fnb.extraRootSnapshotCheck = f
return fnb
}

func (fnb *FlowNodeBuilder) ValidateFlags(f func() error) NodeBuilder {
fnb.extraFlagCheck = f
return fnb
Expand Down Expand Up @@ -1096,6 +1104,14 @@ func (fnb *FlowNodeBuilder) setRootSnapshot(rootSnapshot protocol.Snapshot) erro
return fmt.Errorf("failed to validate root snapshot QCs: %w", err)
}

// perform extra checks requested by specific node types
if fnb.extraRootSnapshotCheck != nil {
err = fnb.extraRootSnapshotCheck(rootSnapshot)
if err != nil {
return fmt.Errorf("failed to perform extra checks on root snapshot: %w", err)
}
}

fnb.RootSnapshot = rootSnapshot
// cache properties of the root snapshot, for convenience
fnb.RootResult, fnb.RootSeal, err = fnb.RootSnapshot.SealedResult()
Expand Down
2 changes: 1 addition & 1 deletion consensus/aggregators.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func NewTimeoutAggregator(log zerolog.Logger,
lowestRetainedView uint64,
) (hotstuff.TimeoutAggregator, error) {

timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(notifier, distributor, timeoutProcessorFactory)
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(log, notifier, distributor, timeoutProcessorFactory)
collectors := timeoutaggregator.NewTimeoutCollectors(log, lowestRetainedView, timeoutCollectorFactory)

// initialize the timeout aggregator
Expand Down
38 changes: 34 additions & 4 deletions consensus/hotstuff/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,15 @@ type Consumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnQcTriggeredViewChange(qc *flow.QuorumCertificate, newView uint64)
OnQcTriggeredViewChange(oldView uint64, newView uint64, qc *flow.QuorumCertificate)

// OnTcTriggeredViewChange notifications are produced by PaceMaker when it moves to a new view
// based on processing a TC. The arguments specify the tc (first argument), which triggered
// the view change, and the newView to which the PaceMaker transitioned (second argument).
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnTcTriggeredViewChange(tc *flow.TimeoutCertificate, newView uint64)
OnTcTriggeredViewChange(oldView uint64, newView uint64, tc *flow.TimeoutCertificate)

// OnStartingTimeout notifications are produced by PaceMaker. Such a notification indicates that the
// PaceMaker is now waiting for the system to (receive and) process blocks or votes.
Expand All @@ -133,6 +133,36 @@ type Consumer interface {
// and must handle repetition of the same events (with some processing overhead).
OnStartingTimeout(model.TimerInfo)

// OnVoteProcessed notifications are produced by the Vote Aggregation logic, each time
// we successfully ingest a valid vote.
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnVoteProcessed(vote *model.Vote)

// OnTimeoutProcessed notifications are produced by the Timeout Aggregation logic,
// each time we successfully ingest a valid timeout.
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnTimeoutProcessed(timeout *model.TimeoutObject)

// OnCurrentViewDetails notifications are produced by the EventHandler during the course of a view with auxiliary information.
// These notifications are generally not produced for all views (for example skipped views).
// These notifications are guaranteed to be produced for all views we enter after fully processing a message.
// Example 1:
// - We are in view 8. We process a QC with view 10, causing us to enter view 11.
// - Then this notification will be produced for view 11.
// Example 2:
// - We are in view 8. We process a proposal with view 10, which contains a TC for view 9 and TC.NewestQC for view 8.
// - The QC would allow us to enter view 9 and the TC would allow us to enter view 10,
// so after fully processing the message we are in view 10.
// - Then this notification will be produced for view 10, but not view 9
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnCurrentViewDetails(currentView, finalizedView uint64, currentLeader flow.Identifier)

// OnDoubleVotingDetected notifications are produced by the Vote Aggregation logic
// whenever a double voting (same voter voting for different blocks at the same view) was detected.
// Prerequisites:
Expand All @@ -145,7 +175,7 @@ type Consumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnInvalidVoteDetected(*model.Vote)
OnInvalidVoteDetected(err model.InvalidVoteError)

// OnVoteForInvalidBlockDetected notifications are produced by the Vote Aggregation logic
// whenever vote for invalid proposal was detected.
Expand All @@ -166,7 +196,7 @@ type Consumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnInvalidTimeoutDetected(*model.TimeoutObject)
OnInvalidTimeoutDetected(err model.InvalidTimeoutError)
}

// QCCreatedConsumer consumes outbound notifications produced by HotStuff and its components.
Expand Down
5 changes: 4 additions & 1 deletion consensus/hotstuff/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,14 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
if err != nil {
return fmt.Errorf("failed to determine primary for new view %d: %w", curView, err)
}
finalizedView := e.forks.FinalizedView()
log := e.log.With().
Uint64("cur_view", curView).
Uint64("finalized_view", e.forks.FinalizedView()).
Uint64("finalized_view", finalizedView).
Hex("leader_id", currentLeader[:]).Logger()

e.notifier.OnCurrentViewDetails(curView, finalizedView, currentLeader)

// check that I am the primary for this view and that I haven't already proposed; otherwise there is nothing to do
if e.committee.Self() != currentLeader {
return nil
Expand Down
5 changes: 3 additions & 2 deletions consensus/hotstuff/eventhandler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ func initPaceMaker(t require.TestingT, ctx context.Context, livenessData *hotstu
persist.On("GetLivenessData").Return(livenessData, nil).Once()
pm := NewTestPaceMaker(timeout.NewController(tc), notifier, persist)
notifier.On("OnStartingTimeout", mock.Anything).Return()
notifier.On("OnQcTriggeredViewChange", mock.Anything, mock.Anything).Return()
notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything).Return()
notifier.On("OnQcTriggeredViewChange", mock.Anything, mock.Anything, mock.Anything).Return()
notifier.On("OnTcTriggeredViewChange", mock.Anything, mock.Anything, mock.Anything).Return()
notifier.On("OnViewChange", mock.Anything, mock.Anything).Maybe()
pm.Start(ctx)
return pm
Expand Down Expand Up @@ -301,6 +301,7 @@ func (es *EventHandlerSuite) SetupTest() {
es.notifier.On("OnReceiveTc", mock.Anything, mock.Anything).Maybe()
es.notifier.On("OnPartialTc", mock.Anything, mock.Anything).Maybe()
es.notifier.On("OnLocalTimeout", mock.Anything).Maybe()
es.notifier.On("OnCurrentViewDetails", mock.Anything, mock.Anything, mock.Anything).Maybe()

eventhandler, err := NewEventHandler(
zerolog.New(os.Stderr),
Expand Down
15 changes: 13 additions & 2 deletions consensus/hotstuff/integration/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,22 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
nil,
).Maybe()

p, err := timeoutcollector.NewTimeoutProcessor(in.committee, in.validator, aggregator, collectorDistributor)
p, err := timeoutcollector.NewTimeoutProcessor(
unittest.Logger(),
in.committee,
in.validator,
aggregator,
collectorDistributor,
)
require.NoError(t, err)
return p
}, nil).Maybe()
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(notifier, collectorDistributor, timeoutProcessorFactory)
timeoutCollectorFactory := timeoutcollector.NewTimeoutCollectorFactory(
unittest.Logger(),
notifier,
collectorDistributor,
timeoutProcessorFactory,
)
timeoutCollectors := timeoutaggregator.NewTimeoutCollectors(log, livenessData.CurrentView, timeoutCollectorFactory)

// initialize the timeout aggregator
Expand Down
39 changes: 27 additions & 12 deletions consensus/hotstuff/mocks/consumer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7f02a64

Please sign in to comment.