Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Consensus Matching Engine: engine.Unit -> ComponentManager #6916

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 49 additions & 91 deletions engine/consensus/matching/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
sealing "github.com/onflow/flow-go/engine/consensus"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
Expand All @@ -27,7 +29,8 @@ const defaultIncorporatedBlockQueueCapacity = 10
// Engine is a wrapper struct for `Core` which implements consensus algorithm.
// Engine is responsible for handling incoming messages, queueing for processing, broadcasting proposals.
type Engine struct {
unit *engine.Unit
component.Component
cm *component.ComponentManager
log zerolog.Logger
me module.Local
core sealing.MatchingCore
Expand Down Expand Up @@ -69,7 +72,6 @@ func NewEngine(

e := &Engine{
log: log.With().Str("engine", "matching.Engine").Logger(),
unit: engine.NewUnit(),
me: me,
core: core,
state: state,
Expand All @@ -89,82 +91,44 @@ func NewEngine(
return nil, fmt.Errorf("could not register for results: %w", err)
}

return e, nil
}

// Ready returns a ready channel that is closed once the engine has fully
// started. For consensus engine, this is true once the underlying consensus
// algorithm has started.
func (e *Engine) Ready() <-chan struct{} {
e.unit.Launch(e.inboundEventsProcessingLoop)
e.unit.Launch(e.finalizationProcessingLoop)
e.unit.Launch(e.blockIncorporatedEventsProcessingLoop)
return e.unit.Ready()
}

// Done returns a done channel that is closed once the engine has fully stopped.
// For the consensus engine, we wait for hotstuff to finish.
func (e *Engine) Done() <-chan struct{} {
return e.unit.Done()
}

// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
err := e.ProcessLocal(event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}
e.cm = component.NewComponentManagerBuilder().
AddWorker(e.inboundEventsProcessingLoop).
AddWorker(e.finalizationProcessingLoop).
AddWorker(e.blockIncorporatedEventsProcessingLoop).
Build()
e.Component = e.cm

// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
err := e.Process(channel, originID, event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}

// ProcessLocal processes an event originating on the local node.
func (e *Engine) ProcessLocal(event interface{}) error {
return e.process(e.me.NodeID(), event)
return e, nil
}

// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
// Process receives events from the network and checks their type,
// before enqueuing them to be processed by a worker in a non-blocking manner.
// No errors expected during normal operation (errors are logged instead).
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
err := e.process(originID, event)
if err != nil {
if engine.IsIncompatibleInputTypeError(err) {
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel)
return nil
}
return fmt.Errorf("unexpected error while processing engine message: %w", err)
receipt, ok := event.(*flow.ExecutionReceipt)
if !ok {
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel)
return nil
}
e.addReceiptToQueue(receipt)
return nil
}

// process events for the matching engine on the consensus node.
func (e *Engine) process(originID flow.Identifier, event interface{}) error {
receipt, ok := event.(*flow.ExecutionReceipt)
if !ok {
return fmt.Errorf("no matching processor for message of type %T from origin %x: %w", event, originID[:],
engine.IncompatibleInputTypeError)
}
// addReceiptToQueue adds an execution receipt to the queue of the matching engine, to be processed by a worker
func (e *Engine) addReceiptToQueue(receipt *flow.ExecutionReceipt) {
e.metrics.MessageReceived(metrics.EngineSealing, metrics.MessageExecutionReceipt)
e.pendingReceipts.Push(receipt)
e.inboundEventsNotifier.Notify()
return nil
}

// HandleReceipt ingests receipts from the Requester module.
// HandleReceipt ingests receipts from the Requester module, adding them to the queue.
func (e *Engine) HandleReceipt(originID flow.Identifier, receipt flow.Entity) {
e.log.Debug().Msg("received receipt from requester engine")
err := e.process(originID, receipt)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event from requester module")
r, ok := receipt.(*flow.ExecutionReceipt)
if !ok {
e.log.Fatal().Err(engine.IncompatibleInputTypeError).Msg("internal error processing event from requester module")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that ideally HandleReceipt would also be able to use ctx.Throw instead of Log.Fatal, but the function type is already dictated by being used as a HandleFunc by the Requester engine. Could be pushed to a future refactor of Requester Engine (since that one also still uses engine.Unit).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that this could be done as part of updating the requester engine. It would be great to have type-safe handler functions in the requester engine, which we could implement by making the requester engine and its Create/Handle functions generic.

}
e.addReceiptToQueue(r)
}

// OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer`
Expand All @@ -186,7 +150,7 @@ func (e *Engine) OnBlockIncorporated(incorporatedBlock *model.Block) {
// for further processing by matching core.
// Without the logic below, the sealing engine would produce IncorporatedResults
// only from receipts received directly from ENs. sealing Core would not know about
// Receipts that are incorporated by other nodes in their blocks blocks (but never
// Receipts that are incorporated by other nodes in their blocks (but never
// received directly from the EN).
// No errors expected during normal operations.
func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error {
Expand All @@ -206,60 +170,63 @@ func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error {
}

// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events
func (e *Engine) finalizationProcessingLoop() {
func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
finalizationNotifier := e.finalizationEventsNotifier.Channel()
ready()
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return
case <-finalizationNotifier:
err := e.core.OnBlockFinalization()
if err != nil {
e.log.Fatal().Err(err).Msg("could not process last finalized event")
ctx.Throw(fmt.Errorf("could not process last finalized event: %w", err))
}
}
}
}

// blockIncorporatedEventsProcessingLoop is a separate goroutine for processing block incorporated events.
func (e *Engine) blockIncorporatedEventsProcessingLoop() {
func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
c := e.blockIncorporatedNotifier.Channel()

ready()
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return
case <-c:
err := e.processBlockIncorporatedEvents()
err := e.processBlockIncorporatedEvents(ctx)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing block incorporated queued message")
ctx.Throw(fmt.Errorf("internal error processing block incorporated queued message: %w", err))
}
}
}
}

func (e *Engine) inboundEventsProcessingLoop() {
// inboundEventsProcessingLoop is a worker for processing execution receipts, received
// from the network via Process, from the Requester module via HandleReceipt, or from incorporated blocks.
func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
c := e.inboundEventsNotifier.Channel()

ready()
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return
case <-c:
err := e.processAvailableEvents()
err := e.processExecutionReceipts(ctx)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing queued message")
ctx.Throw(fmt.Errorf("internal error processing queued execution receipt: %w", err))
}
}
}
}

// processBlockIncorporatedEvents performs processing of block incorporated hot stuff events.
// No errors expected during normal operations.
func (e *Engine) processBlockIncorporatedEvents() error {
func (e *Engine) processBlockIncorporatedEvents(ctx irrecoverable.SignalerContext) error {
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return nil
default:
}
Expand All @@ -279,27 +246,18 @@ func (e *Engine) processBlockIncorporatedEvents() error {
}
}

// processAvailableEvents processes _all_ available events (untrusted messages
// processExecutionReceipts processes execution receipts
// from other nodes as well as internally trusted.
// No errors expected during normal operations.
func (e *Engine) processAvailableEvents() error {
func (e *Engine) processExecutionReceipts(ctx irrecoverable.SignalerContext) error {
for {
select {
case <-e.unit.Quit():
case <-ctx.Done():
return nil
default:
}

msg, ok := e.pendingIncorporatedBlocks.Pop()
if ok {
err := e.processIncorporatedBlock(msg.(flow.Identifier))
if err != nil {
return fmt.Errorf("could not process incorporated block: %w", err)
}
continue
}

msg, ok = e.pendingReceipts.Pop()
msg, ok := e.pendingReceipts.Pop()
if ok {
err := e.core.ProcessReceipt(msg.(*flow.ExecutionReceipt))
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions engine/consensus/matching/engine_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package matching

import (
"context"
"sync"
"testing"
"time"
Expand All @@ -10,9 +11,9 @@ import (
"github.com/stretchr/testify/suite"

"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine"
mockconsensus "github.com/onflow/flow-go/engine/consensus/mock"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
mockmodule "github.com/onflow/flow-go/module/mock"
"github.com/onflow/flow-go/network/channels"
Expand Down Expand Up @@ -57,6 +58,8 @@ func (s *MatchingEngineSuite) SetupTest() {
s.engine, err = NewEngine(unittest.Logger(), net, me, metrics, metrics, s.state, s.receipts, s.index, s.core)
require.NoError(s.T(), err)

ctx := irrecoverable.NewMockSignalerContext(s.T(), context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add teardown logic to make sure we are gracefully stopping the engine?

  • Use NewMockSignalerContextWithCancel and store the CancelFunc in the test suite struct
  • Add a TearDownTest function, call cancel() and wait for engine.Done using AssertClosesBefore

s.engine.Start(ctx)
<-s.engine.Ready()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you wrap this in AssertClosesBefore

}

Expand Down Expand Up @@ -135,15 +138,12 @@ func (s *MatchingEngineSuite) TestMultipleProcessingItems() {
s.core.AssertExpectations(s.T())
}

// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type
// was submitted from network layer.
// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type
// (byzantine message) was submitted from network layer.
func (s *MatchingEngineSuite) TestProcessUnsupportedMessageType() {
invalidEvent := uint64(42)
err := s.engine.Process("ch", unittest.IdentifierFixture(), invalidEvent)
// shouldn't result in error since byzantine inputs are expected
require.NoError(s.T(), err)
// in case of local processing error cannot be consumed since all inputs are trusted
err = s.engine.ProcessLocal(invalidEvent)
require.Error(s.T(), err)
require.True(s.T(), engine.IsIncompatibleInputTypeError(err))
// Local processing happens only via HandleReceipt, which will log.Fatal on invalid input
}
Loading