Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Consensus Matching Engine: engine.Unit -> ComponentManager #6916

Merged
140 changes: 49 additions & 91 deletions engine/consensus/matching/engine.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is an important best practise to register the component with the networking layer (👉 this code) only at the very end of the constructor, after we have fully constructed the component (including all the worker routines 👉 this code).

I like to frame this as a very foundational best practise in software engineering: don't reveal a reference to a component until the component is fully constructed. Usually, we implement this via a constructor that only at the end returns the reference to the constructed object. Our business logic is a bit more complicated, in that we don't want to rely on the caller to registers the matching.Engine with the networking layer - so we do it within the constructor; however then we have to make sure we are not revealing the pointer of the object under construction until the object instantiation is properly completed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - I wasn't thinking deliberately enough about network interfaces, and was assuming the ComponentManager would want to be last due to the workers it starts potentially needing to access parts of the engine under construction for a similar reason; but basically forgot that the worker routines will only actually need those resources once the component is Started (after construction).

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
sealing "github.com/onflow/flow-go/engine/consensus"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
Expand All @@ -27,7 +29,8 @@ const defaultIncorporatedBlockQueueCapacity = 10
// Engine is a wrapper struct for `Core` which implements consensus algorithm.
// Engine is responsible for handling incoming messages, queueing for processing, broadcasting proposals.
type Engine struct {
unit *engine.Unit
component.Component
cm *component.ComponentManager
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
log zerolog.Logger
me module.Local
core sealing.MatchingCore
Expand Down Expand Up @@ -69,7 +72,6 @@ func NewEngine(

e := &Engine{
log: log.With().Str("engine", "matching.Engine").Logger(),
unit: engine.NewUnit(),
me: me,
core: core,
state: state,
Expand All @@ -89,82 +91,44 @@ func NewEngine(
return nil, fmt.Errorf("could not register for results: %w", err)
}

return e, nil
}

// Ready returns a ready channel that is closed once the engine has fully
// started. For consensus engine, this is true once the underlying consensus
// algorithm has started.
func (e *Engine) Ready() <-chan struct{} {
e.unit.Launch(e.inboundEventsProcessingLoop)
e.unit.Launch(e.finalizationProcessingLoop)
e.unit.Launch(e.blockIncorporatedEventsProcessingLoop)
return e.unit.Ready()
}

// Done returns a done channel that is closed once the engine has fully stopped.
// For the consensus engine, we wait for hotstuff to finish.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
}

// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
err := e.ProcessLocal(event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}
e.cm = component.NewComponentManagerBuilder().
AddWorker(e.inboundEventsProcessingLoop).
AddWorker(e.finalizationProcessingLoop).
AddWorker(e.blockIncorporatedEventsProcessingLoop).
Build()
e.Component = e.cm

// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
err := e.Process(channel, originID, event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}

// ProcessLocal processes an event originating on the local node.
func (e *Engine) ProcessLocal(event interface{}) error {
return e.process(e.me.NodeID(), event)
return e, nil
}

// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
// Process receives events from the network and checks their type,
// before enqueuing them to be processed by a worker in a non-blocking manner.
// No errors expected during normal operation (errors are logged instead).
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
err := e.process(originID, event)
if err != nil {
if engine.IsIncompatibleInputTypeError(err) {
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel)
return nil
}
return fmt.Errorf("unexpected error while processing engine message: %w", err)
receipt, ok := event.(*flow.ExecutionReceipt)
if !ok {
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel)
return nil
}
e.addReceiptToQueue(receipt)
return nil
}

// process events for the matching engine on the consensus node.
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
receipt, ok := event.(*flow.ExecutionReceipt)
if !ok {
return fmt.Errorf("no matching processor for message of type %T from origin %x: %w", event, originID[:],
engine.IncompatibleInputTypeError)
}
// addReceiptToQueue adds an execution receipt to the queue of the matching engine, to be processed by a worker
func (e *Engine) addReceiptToQueue(receipt *flow.ExecutionReceipt) {
e.metrics.MessageReceived(metrics.EngineSealing, metrics.MessageExecutionReceipt)
e.pendingReceipts.Push(receipt)
e.inboundEventsNotifier.Notify()
return nil
}

// HandleReceipt ingests receipts from the Requester module.
// HandleReceipt ingests receipts from the Requester module, adding them to the queue.
func (e *Engine) HandleReceipt(originID flow.Identifier, receipt flow.Entity) {
e.log.Debug().Msg("received receipt from requester engine")
err := e.process(originID, receipt)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event from requester module")
r, ok := receipt.(*flow.ExecutionReceipt)
if !ok {
e.log.Fatal().Err(engine.IncompatibleInputTypeError).Msg("internal error processing event from requester module")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that ideally HandleReceipt would also be able to use ctx.Throw instead of Log.Fatal, but the function type is already dictated by being used as a HandleFunc by the Requester engine. Could be pushed to a future refactor of Requester Engine (since that one also still uses engine.Unit).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this could be done as part of updating the requester engine. It would be great to have type-safe handler functions in the requester engine, which we could implement by making the requester engine and its Create/Handle functions generic.

}
e.addReceiptToQueue(r)
}

// OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer`
Expand All @@ -186,7 +150,7 @@ func (e *Engine) OnBlockIncorporated(incorporatedBlock *model.Block) {
// for further processing by matching core.
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
// Without the logic below, the sealing engine would produce IncorporatedResults
// only from receipts received directly from ENs. sealing Core would not know about
// Receipts that are incorporated by other nodes in their blocks blocks (but never
// Receipts that are incorporated by other nodes in their blocks (but never
// received directly from the EN).
// No errors expected during normal operations.
func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error {
Expand All @@ -206,60 +170,63 @@ func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error {
}

// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events
func (e *Engine) finalizationProcessingLoop() {
func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
finalizationNotifier := e.finalizationEventsNotifier.Channel()
ready()
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return
case <-finalizationNotifier:
err := e.core.OnBlockFinalization()
if err != nil {
e.log.Fatal().Err(err).Msg("could not process last finalized event")
ctx.Throw(fmt.Errorf("could not process last finalized event: %w", err))
}
}
}
}

// blockIncorporatedEventsProcessingLoop is a separate goroutine for processing block incorporated events.
func (e *Engine) blockIncorporatedEventsProcessingLoop() {
func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
c := e.blockIncorporatedNotifier.Channel()

ready()
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return
case <-c:
err := e.processBlockIncorporatedEvents()
err := e.processBlockIncorporatedEvents(ctx)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing block incorporated queued message")
ctx.Throw(fmt.Errorf("internal error processing block incorporated queued message: %w", err))
}
}
}
}

func (e *Engine) inboundEventsProcessingLoop() {
// inboundEventsProcessingLoop is a worker for processing execution receipts, received
// from the network via Process, from the Requester module via HandleReceipt, or from incorporated blocks.
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
c := e.inboundEventsNotifier.Channel()

ready()
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return
case <-c:
err := e.processAvailableEvents()
err := e.processExecutionReceipts(ctx)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing queued message")
ctx.Throw(fmt.Errorf("internal error processing queued execution receipt: %w", err))
}
}
}
}

// processBlockIncorporatedEvents performs processing of block incorporated hot stuff events.
// No errors expected during normal operations.
func (e *Engine) processBlockIncorporatedEvents() error {
func (e *Engine) processBlockIncorporatedEvents(ctx irrecoverable.SignalerContext) error {
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return nil
default:
}
Expand All @@ -279,27 +246,18 @@ func (e *Engine) processBlockIncorporatedEvents() error {
}
}

// processAvailableEvents processes _all_ available events (untrusted messages
// processExecutionReceipts processes execution receipts
// from other nodes as well as internally trusted.
// No errors expected during normal operations.
func (e *Engine) processAvailableEvents() error {
func (e *Engine) processExecutionReceipts(ctx irrecoverable.SignalerContext) error {
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return nil
default:
}

msg, ok := e.pendingIncorporatedBlocks.Pop()
if ok {
err := e.processIncorporatedBlock(msg.(flow.Identifier))
if err != nil {
return fmt.Errorf("could not process incorporated block: %w", err)
}
continue
}

msg, ok = e.pendingReceipts.Pop()
msg, ok := e.pendingReceipts.Pop()
if ok {
err := e.core.ProcessReceipt(msg.(*flow.ExecutionReceipt))
if err != nil {
Expand Down
25 changes: 17 additions & 8 deletions engine/consensus/matching/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package matching

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -10,9 +11,9 @@ import (
"github.com/stretchr/testify/suite"

"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine"
mockconsensus "github.com/onflow/flow-go/engine/consensus/mock"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
mockmodule "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/network/channels"
Expand All @@ -36,6 +37,7 @@ type MatchingEngineSuite struct {

// Matching Engine
engine *Engine
cancel context.CancelFunc
}

func (s *MatchingEngineSuite) SetupTest() {
Expand All @@ -57,7 +59,17 @@ func (s *MatchingEngineSuite) SetupTest() {
s.engine, err = NewEngine(unittest.Logger(), net, me, metrics, metrics, s.state, s.receipts, s.index, s.core)
require.NoError(s.T(), err)

<-s.engine.Ready()
ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), context.Background())
s.cancel = cancel
s.engine.Start(ctx)
unittest.AssertClosesBefore(s.T(), s.engine.Ready(), 10*time.Millisecond)
}

func (s *MatchingEngineSuite) TearDownTest() {
if s.cancel != nil {
s.cancel()
unittest.AssertClosesBefore(s.T(), s.engine.Done(), 10*time.Millisecond)
}
}

// TestOnFinalizedBlock tests if finalized block gets processed when send through `Engine`.
Expand Down Expand Up @@ -135,15 +147,12 @@ func (s *MatchingEngineSuite) TestMultipleProcessingItems() {
s.core.AssertExpectations(s.T())
}

// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type
// was submitted from network layer.
// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type
// (byzantine message) was submitted from network layer.
func (s *MatchingEngineSuite) TestProcessUnsupportedMessageType() {
invalidEvent := uint64(42)
err := s.engine.Process("ch", unittest.IdentifierFixture(), invalidEvent)
// shouldn't result in error since byzantine inputs are expected
require.NoError(s.T(), err)
// in case of local processing error cannot be consumed since all inputs are trusted
err = s.engine.ProcessLocal(invalidEvent)
require.Error(s.T(), err)
require.True(s.T(), engine.IsIncompatibleInputTypeError(err))
// Local processing happens only via HandleReceipt, which will log.Fatal on invalid input
}
Loading