-
Notifications
You must be signed in to change notification settings - Fork 181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Consensus Matching Engine: engine.Unit
-> ComponentManager
#6916
Changes from 5 commits
2dbeb84
a012534
9398df3
c7cba34
c312dd7
b53ff11
85e2881
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,8 @@ import ( | |
sealing "github.com/onflow/flow-go/engine/consensus" | ||
"github.com/onflow/flow-go/model/flow" | ||
"github.com/onflow/flow-go/module" | ||
"github.com/onflow/flow-go/module/component" | ||
"github.com/onflow/flow-go/module/irrecoverable" | ||
"github.com/onflow/flow-go/module/metrics" | ||
"github.com/onflow/flow-go/network" | ||
"github.com/onflow/flow-go/network/channels" | ||
|
@@ -27,7 +29,8 @@ const defaultIncorporatedBlockQueueCapacity = 10 | |
// Engine is a wrapper struct for `Core` which implements consensus algorithm. | ||
// Engine is responsible for handling incoming messages, queueing for processing, broadcasting proposals. | ||
type Engine struct { | ||
unit *engine.Unit | ||
component.Component | ||
cm *component.ComponentManager | ||
tim-barry marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log zerolog.Logger | ||
me module.Local | ||
core sealing.MatchingCore | ||
|
@@ -69,7 +72,6 @@ func NewEngine( | |
|
||
e := &Engine{ | ||
log: log.With().Str("engine", "matching.Engine").Logger(), | ||
unit: engine.NewUnit(), | ||
me: me, | ||
core: core, | ||
state: state, | ||
|
@@ -89,82 +91,44 @@ func NewEngine( | |
return nil, fmt.Errorf("could not register for results: %w", err) | ||
} | ||
|
||
return e, nil | ||
} | ||
|
||
// Ready returns a ready channel that is closed once the engine has fully | ||
// started. For consensus engine, this is true once the underlying consensus | ||
// algorithm has started. | ||
func (e *Engine) Ready() <-chan struct{} { | ||
e.unit.Launch(e.inboundEventsProcessingLoop) | ||
e.unit.Launch(e.finalizationProcessingLoop) | ||
e.unit.Launch(e.blockIncorporatedEventsProcessingLoop) | ||
return e.unit.Ready() | ||
} | ||
|
||
// Done returns a done channel that is closed once the engine has fully stopped. | ||
// For the consensus engine, we wait for hotstuff to finish. | ||
func (e *Engine) Done() <-chan struct{} { | ||
return e.unit.Done() | ||
} | ||
|
||
// SubmitLocal submits an event originating on the local node. | ||
func (e *Engine) SubmitLocal(event interface{}) { | ||
err := e.ProcessLocal(event) | ||
if err != nil { | ||
e.log.Fatal().Err(err).Msg("internal error processing event") | ||
} | ||
} | ||
e.cm = component.NewComponentManagerBuilder(). | ||
AddWorker(e.inboundEventsProcessingLoop). | ||
AddWorker(e.finalizationProcessingLoop). | ||
AddWorker(e.blockIncorporatedEventsProcessingLoop). | ||
Build() | ||
e.Component = e.cm | ||
|
||
// Submit submits the given event from the node with the given origin ID | ||
// for processing in a non-blocking manner. It returns instantly and logs | ||
// a potential processing error internally when done. | ||
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { | ||
err := e.Process(channel, originID, event) | ||
if err != nil { | ||
e.log.Fatal().Err(err).Msg("internal error processing event") | ||
} | ||
} | ||
|
||
// ProcessLocal processes an event originating on the local node. | ||
func (e *Engine) ProcessLocal(event interface{}) error { | ||
return e.process(e.me.NodeID(), event) | ||
return e, nil | ||
} | ||
|
||
// Process processes the given event from the node with the given origin ID in | ||
// a blocking manner. It returns the potential processing error when done. | ||
// Process receives events from the network and checks their type, | ||
// before enqueuing them to be processed by a worker in a non-blocking manner. | ||
// No errors expected during normal operation (errors are logged instead). | ||
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { | ||
err := e.process(originID, event) | ||
if err != nil { | ||
if engine.IsIncompatibleInputTypeError(err) { | ||
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel) | ||
return nil | ||
} | ||
return fmt.Errorf("unexpected error while processing engine message: %w", err) | ||
receipt, ok := event.(*flow.ExecutionReceipt) | ||
if !ok { | ||
e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel) | ||
return nil | ||
} | ||
e.addReceiptToQueue(receipt) | ||
return nil | ||
} | ||
|
||
// process events for the matching engine on the consensus node. | ||
func (e *Engine) process(originID flow.Identifier, event interface{}) error { | ||
receipt, ok := event.(*flow.ExecutionReceipt) | ||
if !ok { | ||
return fmt.Errorf("no matching processor for message of type %T from origin %x: %w", event, originID[:], | ||
engine.IncompatibleInputTypeError) | ||
} | ||
// addReceiptToQueue adds an execution receipt to the queue of the matching engine, to be processed by a worker | ||
func (e *Engine) addReceiptToQueue(receipt *flow.ExecutionReceipt) { | ||
e.metrics.MessageReceived(metrics.EngineSealing, metrics.MessageExecutionReceipt) | ||
e.pendingReceipts.Push(receipt) | ||
e.inboundEventsNotifier.Notify() | ||
return nil | ||
} | ||
|
||
// HandleReceipt ingests receipts from the Requester module. | ||
// HandleReceipt ingests receipts from the Requester module, adding them to the queue. | ||
func (e *Engine) HandleReceipt(originID flow.Identifier, receipt flow.Entity) { | ||
e.log.Debug().Msg("received receipt from requester engine") | ||
err := e.process(originID, receipt) | ||
if err != nil { | ||
e.log.Fatal().Err(err).Msg("internal error processing event from requester module") | ||
r, ok := receipt.(*flow.ExecutionReceipt) | ||
if !ok { | ||
e.log.Fatal().Err(engine.IncompatibleInputTypeError).Msg("internal error processing event from requester module") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that ideally HandleReceipt would also be able to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that this could be done as part of updating the requester engine. It would be great to have type-safe handler functions in the requester engine, which we could implement by making the requester engine and its Create/Handle functions generic. |
||
} | ||
e.addReceiptToQueue(r) | ||
} | ||
|
||
// OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer` | ||
|
@@ -186,7 +150,7 @@ func (e *Engine) OnBlockIncorporated(incorporatedBlock *model.Block) { | |
// for further processing by matching core. | ||
tim-barry marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// Without the logic below, the sealing engine would produce IncorporatedResults | ||
// only from receipts received directly from ENs. sealing Core would not know about | ||
// Receipts that are incorporated by other nodes in their blocks blocks (but never | ||
// Receipts that are incorporated by other nodes in their blocks (but never | ||
// received directly from the EN). | ||
// No errors expected during normal operations. | ||
func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error { | ||
|
@@ -206,60 +170,63 @@ func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error { | |
} | ||
|
||
// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events | ||
func (e *Engine) finalizationProcessingLoop() { | ||
func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { | ||
finalizationNotifier := e.finalizationEventsNotifier.Channel() | ||
ready() | ||
for { | ||
select { | ||
case <-e.unit.Quit(): | ||
case <-ctx.Done(): | ||
return | ||
case <-finalizationNotifier: | ||
err := e.core.OnBlockFinalization() | ||
if err != nil { | ||
e.log.Fatal().Err(err).Msg("could not process last finalized event") | ||
ctx.Throw(fmt.Errorf("could not process last finalized event: %w", err)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// blockIncorporatedEventsProcessingLoop is a separate goroutine for processing block incorporated events. | ||
func (e *Engine) blockIncorporatedEventsProcessingLoop() { | ||
func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { | ||
c := e.blockIncorporatedNotifier.Channel() | ||
|
||
ready() | ||
for { | ||
select { | ||
case <-e.unit.Quit(): | ||
case <-ctx.Done(): | ||
return | ||
case <-c: | ||
err := e.processBlockIncorporatedEvents() | ||
err := e.processBlockIncorporatedEvents(ctx) | ||
if err != nil { | ||
e.log.Fatal().Err(err).Msg("internal error processing block incorporated queued message") | ||
ctx.Throw(fmt.Errorf("internal error processing block incorporated queued message: %w", err)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (e *Engine) inboundEventsProcessingLoop() { | ||
// inboundEventsProcessingLoop is a worker for processing execution receipts, received | ||
// from the network via Process, from the Requester module via HandleReceipt, or from incorporated blocks. | ||
tim-barry marked this conversation as resolved.
Show resolved
Hide resolved
|
||
func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { | ||
c := e.inboundEventsNotifier.Channel() | ||
|
||
ready() | ||
for { | ||
select { | ||
case <-e.unit.Quit(): | ||
case <-ctx.Done(): | ||
return | ||
case <-c: | ||
err := e.processAvailableEvents() | ||
err := e.processExecutionReceipts(ctx) | ||
if err != nil { | ||
e.log.Fatal().Err(err).Msg("internal error processing queued message") | ||
ctx.Throw(fmt.Errorf("internal error processing queued execution receipt: %w", err)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
// processBlockIncorporatedEvents performs processing of block incorporated hot stuff events. | ||
// No errors expected during normal operations. | ||
func (e *Engine) processBlockIncorporatedEvents() error { | ||
func (e *Engine) processBlockIncorporatedEvents(ctx irrecoverable.SignalerContext) error { | ||
for { | ||
select { | ||
case <-e.unit.Quit(): | ||
case <-ctx.Done(): | ||
return nil | ||
default: | ||
} | ||
|
@@ -279,27 +246,18 @@ func (e *Engine) processBlockIncorporatedEvents() error { | |
} | ||
} | ||
|
||
// processAvailableEvents processes _all_ available events (untrusted messages | ||
// processExecutionReceipts processes execution receipts | ||
// from other nodes as well as internally trusted. | ||
// No errors expected during normal operations. | ||
func (e *Engine) processAvailableEvents() error { | ||
func (e *Engine) processExecutionReceipts(ctx irrecoverable.SignalerContext) error { | ||
for { | ||
select { | ||
case <-e.unit.Quit(): | ||
case <-ctx.Done(): | ||
return nil | ||
default: | ||
} | ||
|
||
msg, ok := e.pendingIncorporatedBlocks.Pop() | ||
if ok { | ||
err := e.processIncorporatedBlock(msg.(flow.Identifier)) | ||
if err != nil { | ||
return fmt.Errorf("could not process incorporated block: %w", err) | ||
} | ||
continue | ||
} | ||
|
||
msg, ok = e.pendingReceipts.Pop() | ||
msg, ok := e.pendingReceipts.Pop() | ||
if ok { | ||
err := e.core.ProcessReceipt(msg.(*flow.ExecutionReceipt)) | ||
if err != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is an important best practise to register the component with the networking layer (👉 this code) only at the very end of the constructor, after we have fully constructed the component (including all the worker routines 👉 this code).
I like to frame this as a very foundational best practise in software engineering: don't reveal a reference to a component until the component is fully constructed. Usually, we implement this via a constructor that only at the end returns the reference to the constructed object. Our business logic is a bit more complicated, in that we don't want to rely on the caller to registers the
matching.Engine
with the networking layer - so we do it within the constructor; however then we have to make sure we are not revealing the pointer of the object under construction until the object instantiation is properly completed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense - I wasn't thinking deliberately enough about network interfaces, and was assuming the ComponentManager would want to be last due to the workers it starts potentially needing to access parts of the engine under construction for a similar reason; but basically forgot that the worker routines will only actually need those resources once the component is Started (after construction).