Skip to content

Commit

Permalink
Connected telemetry and slashing consumers to consensus distributors.
Browse files Browse the repository at this point in the history
  • Loading branch information
durkmurder committed May 9, 2023
1 parent fcb48c3 commit d11f517
Show file tree
Hide file tree
Showing 16 changed files with 51 additions and 33 deletions.
2 changes: 1 addition & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN

func FlowAccessNode(nodeBuilder *cmd.FlowNodeBuilder) *FlowAccessNodeBuilder {
dist := consensuspubsub.NewFollowerDistributor()
dist.AddProtocolViolationConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
dist.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
return &FlowAccessNodeBuilder{
AccessNodeConfig: DefaultAccessNodeConfig(),
FlowNodeBuilder: nodeBuilder,
Expand Down
2 changes: 1 addition & 1 deletion cmd/collection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func main() {
PreInit(cmd.DynamicStartPreInit).
Module("follower distributor", func(node *cmd.NodeConfig) error {
followerDistributor = pubsub.NewFollowerDistributor()
followerDistributor.AddProtocolViolationConsumer(notifications.NewSlashingViolationsConsumer(node.Logger))
followerDistributor.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(node.Logger))
return nil
}).
Module("mutable follower state", func(node *cmd.NodeConfig) error {
Expand Down
18 changes: 14 additions & 4 deletions cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,6 @@ func main() {
}).
Module("follower distributor", func(node *cmd.NodeConfig) error {
followerDistributor = pubsub.NewFollowerDistributor()
followerDistributor.AddProtocolViolationConsumer(notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger))
return nil
}).
Module("machine account config", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -554,12 +553,17 @@ func main() {
// create consensus logger
logger := createLogger(node.Logger, node.RootChainID)

telemetryConsumer := notifications.NewTelemetryConsumer(logger)
slashingViolationConsumer := notifications.NewSlashingViolationsConsumer(nodeBuilder.Logger)
followerDistributor.AddProposalViolationConsumer(slashingViolationConsumer)

// initialize a logging notifier for hotstuff
notifier := createNotifier(
logger,
mainMetrics,
)

notifier.AddParticipantConsumer(telemetryConsumer)
notifier.AddFollowerConsumer(followerDistributor)

// initialize the persister
Expand All @@ -582,8 +586,11 @@ func main() {
return nil, err
}

// TODO: connect to slashing violation consumer
// create producer and connect it to consumers
voteAggregationDistributor := pubsub.NewVoteAggregationDistributor()
voteAggregationDistributor.AddVoteCollectorConsumer(telemetryConsumer)
voteAggregationDistributor.AddVoteAggregationViolationConsumer(slashingViolationConsumer)

validator := consensus.NewValidator(mainMetrics, wrappedCommittee)
voteProcessorFactory := votecollector.NewCombinedVoteProcessorFactory(wrappedCommittee, voteAggregationDistributor.OnQcConstructedFromVotes)
lowestViewForVoteProcessing := finalizedBlock.View + 1
Expand All @@ -600,8 +607,11 @@ func main() {
return nil, fmt.Errorf("could not initialize vote aggregator: %w", err)
}

// TODO: connect to slashing violation consumer
// create producer and connect it to consumers
timeoutAggregationDistributor := pubsub.NewTimeoutAggregationDistributor()
timeoutAggregationDistributor.AddTimeoutCollectorConsumer(telemetryConsumer)
timeoutAggregationDistributor.AddTimeoutAggregationViolationConsumer(slashingViolationConsumer)

timeoutProcessorFactory := timeoutcollector.NewTimeoutProcessorFactory(
logger,
timeoutAggregationDistributor,
Expand All @@ -628,7 +638,7 @@ func main() {
Committee: wrappedCommittee,
Signer: signer,
Persist: persist,
QCCreatedDistributor: voteAggregationDistributor.VoteCollectorDistributor,
VoteCollectorDistributor: voteAggregationDistributor.VoteCollectorDistributor,
FollowerDistributor: followerDistributor,
TimeoutCollectorDistributor: timeoutAggregationDistributor.TimeoutCollectorDistributor,
Forks: forks,
Expand Down
2 changes: 0 additions & 2 deletions cmd/consensus/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ func createLogger(log zerolog.Logger, chainID flow.ChainID) zerolog.Logger {

// createNotifier creates a pubsub distributor and connects it to consensus consumers.
func createNotifier(log zerolog.Logger, metrics module.HotstuffMetrics) *pubsub.Distributor {
telemetryConsumer := notifications.NewTelemetryConsumer(log)
metricsConsumer := metricsconsumer.NewMetricsConsumer(metrics)
logsConsumer := notifications.NewLogConsumer(log)
dis := pubsub.NewDistributor()
dis.AddConsumer(telemetryConsumer)
dis.AddConsumer(metricsConsumer)
dis.AddConsumer(logsConsumer)
return dis
Expand Down
2 changes: 1 addition & 1 deletion cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (exeNode *ExecutionNode) LoadExecutionReceiptsStorage(

func (exeNode *ExecutionNode) LoadFollowerDistributor(node *NodeConfig) error {
exeNode.followerDistributor = pubsub.NewFollowerDistributor()
exeNode.followerDistributor.AddProtocolViolationConsumer(notifications.NewSlashingViolationsConsumer(node.Logger))
exeNode.followerDistributor.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(node.Logger))
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func NewFlowObserverServiceBuilder(opts ...Option) *ObserverServiceBuilder {
FlowNodeBuilder: cmd.FlowNode("observer"),
FollowerDistributor: pubsub.NewFollowerDistributor(),
}
anb.FollowerDistributor.AddProtocolViolationConsumer(notifications.NewSlashingViolationsConsumer(anb.Logger))
anb.FollowerDistributor.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(anb.Logger))
// the observer gets a version of the root snapshot file that does not contain any node addresses
// hence skip all the root snapshot validations that involved an identity address
anb.FlowNodeBuilder.SkipNwAddressBasedValidations = true
Expand Down
2 changes: 1 addition & 1 deletion cmd/verification_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (v *VerificationNodeBuilder) LoadComponentsAndModules() {
}).
Module("follower distributor", func(node *NodeConfig) error {
followerDistributor = pubsub.NewFollowerDistributor()
followerDistributor.AddProtocolViolationConsumer(notifications.NewSlashingViolationsConsumer(node.Logger))
followerDistributor.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(node.Logger))
return nil
}).
Module("sync core", func(node *NodeConfig) error {
Expand Down
22 changes: 11 additions & 11 deletions consensus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ import (
// HotstuffModules is a helper structure to encapsulate dependencies to create
// a hotStuff participant.
type HotstuffModules struct {
Committee hotstuff.DynamicCommittee // consensus committee
Signer hotstuff.Signer // signer of proposal & votes
Persist hotstuff.Persister // last state of consensus participant
Notifier *pubsub.Distributor // observer for hotstuff events
FollowerDistributor *pubsub.FollowerDistributor // observer for finalization events, used by compliance engine
QCCreatedDistributor *pubsub.VoteCollectorDistributor // observer for qc created event, used by leader
TimeoutCollectorDistributor *pubsub.TimeoutCollectorDistributor
Forks hotstuff.Forks // information about multiple forks
Validator hotstuff.Validator // validator of proposals & votes
VoteAggregator hotstuff.VoteAggregator // aggregator of votes, used by leader
TimeoutAggregator hotstuff.TimeoutAggregator // aggregator of `TimeoutObject`s, used by every replica
Committee hotstuff.DynamicCommittee // consensus committee
Signer hotstuff.Signer // signer of proposal & votes
Persist hotstuff.Persister // last state of consensus participant
Notifier *pubsub.Distributor // observer for hotstuff events
FollowerDistributor *pubsub.FollowerDistributor // observer for finalization events, used by compliance engine
VoteCollectorDistributor *pubsub.VoteCollectorDistributor // observer for vote aggregation events, used by leader
TimeoutCollectorDistributor *pubsub.TimeoutCollectorDistributor // observer for timeout aggregation events
Forks hotstuff.Forks // information about multiple forks
Validator hotstuff.Validator // validator of proposals & votes
VoteAggregator hotstuff.VoteAggregator // aggregator of votes, used by leader
TimeoutAggregator hotstuff.TimeoutAggregator // aggregator of `TimeoutObject`s, used by every replica
}

type ParticipantConfig struct {
Expand Down
4 changes: 2 additions & 2 deletions consensus/hotstuff/notifications/pubsub/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewFollowerDistributor() *FollowerDistributor {

func (d *FollowerDistributor) AddFollowerConsumer(consumer hotstuff.FollowerConsumer) {
d.FinalizationDistributor.AddFinalizationConsumer(consumer)
d.ProtocolViolationDistributor.AddProtocolViolationConsumer(consumer)
d.ProtocolViolationDistributor.AddProposalViolationConsumer(consumer)
}

type TimeoutAggregationDistributor struct {
Expand Down Expand Up @@ -85,5 +85,5 @@ func NewVoteAggregationDistributor() *VoteAggregationDistributor {

func (d *VoteAggregationDistributor) AddVoteAggregationConsumer(consumer hotstuff.VoteAggregationConsumer) {
d.VoteAggregationViolationDistributor.AddVoteAggregationViolationConsumer(consumer)
d.VoteCollectorDistributor.AddQCCreatedConsumer(consumer)
d.VoteCollectorDistributor.AddVoteCollectorConsumer(consumer)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func NewProtocolViolationDistributor() *ProtocolViolationDistributor {
return &ProtocolViolationDistributor{}
}

func (d *ProtocolViolationDistributor) AddProtocolViolationConsumer(consumer hotstuff.ProposalViolationConsumer) {
func (d *ProtocolViolationDistributor) AddProposalViolationConsumer(consumer hotstuff.ProposalViolationConsumer) {
d.lock.Lock()
defer d.lock.Unlock()
d.subscribers = append(d.subscribers, consumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewQCCreatedDistributor() *VoteCollectorDistributor {
return &VoteCollectorDistributor{}
}

func (d *VoteCollectorDistributor) AddQCCreatedConsumer(consumer hotstuff.VoteCollectorConsumer) {
func (d *VoteCollectorDistributor) AddVoteCollectorConsumer(consumer hotstuff.VoteCollectorConsumer) {
d.lock.Lock()
defer d.lock.Unlock()
d.consumers = append(d.consumers, consumer)
Expand Down
14 changes: 12 additions & 2 deletions consensus/hotstuff/notifications/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,15 @@ import (
//
// Telemetry does NOT capture slashing notifications
type TelemetryConsumer struct {
NoopConsumer
NoopTimeoutCollectorConsumer
NoopVoteCollectorConsumer
pathHandler *PathHandler
noPathLogger zerolog.Logger
}

var _ hotstuff.Consumer = (*TelemetryConsumer)(nil)
var _ hotstuff.ParticipantConsumer = (*TelemetryConsumer)(nil)
var _ hotstuff.VoteCollectorConsumer = (*TelemetryConsumer)(nil)
var _ hotstuff.TimeoutCollectorConsumer = (*TelemetryConsumer)(nil)

// NewTelemetryConsumer creates consumer that reports telemetry events using logger backend.
// Logger MUST include `chain` parameter as part of log context with corresponding chain ID to correctly map telemetry events to chain.
Expand Down Expand Up @@ -240,6 +243,13 @@ func (t *TelemetryConsumer) OnCurrentViewDetails(currentView, finalizedView uint
Msg("OnCurrentViewDetails")
}

func (t *TelemetryConsumer) OnViewChange(oldView, newView uint64) {
t.pathHandler.NextStep().
Uint64("old_view", oldView).
Uint64("new_view", newView).
Msg("OnViewChange")
}

// PathHandler maintains a notion of the current path through the state machine.
// It allows to close a path and open new path. Each path is identified by a unique
// (randomly generated) uuid. Along each path, we can capture information about relevant
Expand Down
2 changes: 1 addition & 1 deletion consensus/integration/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func createNode(
Committee: committee,
Signer: signer,
Persist: persist,
QCCreatedDistributor: qcDistributor,
VoteCollectorDistributor: qcDistributor,
TimeoutCollectorDistributor: timeoutCollectorDistributor,
VoteAggregator: voteAggregator,
TimeoutAggregator: timeoutAggregator,
Expand Down
2 changes: 1 addition & 1 deletion consensus/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func NewParticipant(
}

// add observer, event loop needs to receive events from distributor
modules.QCCreatedDistributor.AddQCCreatedConsumer(loop)
modules.VoteCollectorDistributor.AddVoteCollectorConsumer(loop)
modules.TimeoutCollectorDistributor.AddTimeoutCollectorConsumer(loop)

return loop, nil
Expand Down
4 changes: 2 additions & 2 deletions engine/collection/epochmgr/factories/hotstuff.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (f *HotStuffFactory) CreateModules(
notifier.AddConsumer(notifications.NewLogConsumer(log))
notifier.AddConsumer(hotmetrics.NewMetricsConsumer(metrics))
notifier.AddConsumer(notifications.NewTelemetryConsumer(log))
notifier.AddProtocolViolationConsumer(notifications.NewSlashingViolationsConsumer(log))
notifier.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(log))

var (
err error
Expand Down Expand Up @@ -159,7 +159,7 @@ func (f *HotStuffFactory) CreateModules(
Persist: persister.New(f.db, cluster.ChainID()),
VoteAggregator: voteAggregator,
TimeoutAggregator: timeoutAggregator,
QCCreatedDistributor: qcDistributor,
VoteCollectorDistributor: qcDistributor,
TimeoutCollectorDistributor: timeoutCollectorDistributor,
FollowerDistributor: notifier.FollowerDistributor,
}, metrics, nil
Expand Down
2 changes: 1 addition & 1 deletion follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func FlowConsensusFollowerService(opts ...FollowerOption) *FollowerServiceBuilde
FlowNodeBuilder: cmd.FlowNode(flow.RoleAccess.String(), config.baseOptions...),
FollowerDistributor: pubsub.NewFollowerDistributor(),
}
ret.FollowerDistributor.AddProtocolViolationConsumer(notifications.NewSlashingViolationsConsumer(ret.Logger))
ret.FollowerDistributor.AddProposalViolationConsumer(notifications.NewSlashingViolationsConsumer(ret.Logger))
// the observer gets a version of the root snapshot file that does not contain any node addresses
// hence skip all the root snapshot validations that involved an identity address
ret.FlowNodeBuilder.SkipNwAddressBasedValidations = true
Expand Down

0 comments on commit d11f517

Please sign in to comment.