From bea14dc18a3271008f465c8752577064765c688a Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Fri, 3 Nov 2023 10:29:04 +0100 Subject: [PATCH 01/12] go/worker/compute/executor: Improve logging --- .../cometbft/apps/roothash/finalization.go | 15 ++++++++++++--- go/consensus/cometbft/apps/roothash/roothash.go | 1 + go/consensus/cometbft/apps/roothash/timeout.go | 5 ++++- .../cometbft/apps/roothash/transactions.go | 6 +++++- .../compute/executor/committee/discrepancy.go | 9 ++++++--- go/worker/compute/executor/committee/node.go | 3 ++- go/worker/compute/executor/committee/p2p.go | 2 ++ 7 files changed, 32 insertions(+), 9 deletions(-) diff --git a/go/consensus/cometbft/apps/roothash/finalization.go b/go/consensus/cometbft/apps/roothash/finalization.go index 9bfd316aa1a..b622ea851a2 100644 --- a/go/consensus/cometbft/apps/roothash/finalization.go +++ b/go/consensus/cometbft/apps/roothash/finalization.go @@ -82,8 +82,10 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo switch err { case commitment.ErrDiscrepancyDetected: ctx.Logger().Warn("executor discrepancy detected", + "runtime_id", rtState.Runtime.ID, "round", round, "rank", rtState.CommitmentPool.HighestRank, + "timeout", timeout, logging.LogEvent, roothash.LogEventExecutionDiscrepancyDetected, ) @@ -97,7 +99,7 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo prevTimeout := rtState.NextTimeout rtState.NextTimeout = ctx.BlockHeight() + 1 + (rtState.Runtime.Executor.RoundTimeout*backupWorkerTimeoutFactorNumerator)/backupWorkerTimeoutFactorDenominator // Current height is ctx.BlockHeight() + 1 - if err = rearmRoundTimeout(ctx, rtState.Runtime.ID, prevTimeout, rtState.NextTimeout); err != nil { + if err = rearmRoundTimeout(ctx, rtState.Runtime.ID, round, prevTimeout, rtState.NextTimeout); err != nil { return err } @@ -114,6 +116,7 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo case commitment.ErrStillWaiting: // Need more commits. ctx.Logger().Debug("insufficient commitments for finality, waiting", + "runtime_id", rtState.Runtime.ID, "round", round, ) return nil @@ -132,8 +135,11 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo // The round has been finalized. ctx.Logger().Debug("finalized round", + "runtime_id", rtState.Runtime.ID, "round", round, - "priority", pool.HighestRank, + "rank", pool.HighestRank, + "scheduler_id", sc.Commitment.Header.SchedulerID, + "timeout", timeout, ) livenessStats.TotalRounds++ @@ -231,6 +237,8 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo switch rtState.CommitmentPool.Discrepancy { case true: ctx.Logger().Debug("executor pool discrepancy", + "runtime_id", rtState.Runtime.ID, + "round", round, "slashing", rtState.Runtime.Staking.Slashing, ) @@ -316,7 +324,7 @@ func (app *rootHashApplication) finalizeBlock(ctx *tmapi.Context, rtState *rooth prevTimeout := rtState.NextTimeout rtState.NextTimeout = roothash.TimeoutNever - return rearmRoundTimeout(ctx, rtState.Runtime.ID, prevTimeout, rtState.NextTimeout) + return rearmRoundTimeout(ctx, rtState.Runtime.ID, blk.Header.Round, prevTimeout, rtState.NextTimeout) } func (app *rootHashApplication) failRound( @@ -327,6 +335,7 @@ func (app *rootHashApplication) failRound( round := rtState.LastBlock.Header.Round + 1 ctx.Logger().Debug("round failed", + "runtime_id", rtState.Runtime.ID, "round", round, "err", err, logging.LogEvent, roothash.LogEventRoundFailed, diff --git a/go/consensus/cometbft/apps/roothash/roothash.go b/go/consensus/cometbft/apps/roothash/roothash.go index c890a257cef..43c7d0d0b28 100644 --- a/go/consensus/cometbft/apps/roothash/roothash.go +++ b/go/consensus/cometbft/apps/roothash/roothash.go @@ -201,6 +201,7 @@ func (app *rootHashApplication) onCommitteeChanged(ctx *tmapi.Context, state *ro ctx.Logger().Debug("updating committee for runtime", "runtime_id", rt.ID, "epoch", epoch, + "committee", committee, ) // Emit an empty block signaling epoch transition. This is required so that diff --git a/go/consensus/cometbft/apps/roothash/timeout.go b/go/consensus/cometbft/apps/roothash/timeout.go index 2dd949cbe05..9b936d3ca05 100644 --- a/go/consensus/cometbft/apps/roothash/timeout.go +++ b/go/consensus/cometbft/apps/roothash/timeout.go @@ -35,11 +35,13 @@ func (app *rootHashApplication) processRoundTimeouts(ctx *tmapi.Context) error { func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace) error { ctx.Logger().Warn("round timeout expired, forcing finalization", + "runtime_id", runtimeID, logging.LogEvent, roothash.LogEventTimerFired, ) if err := app.tryFinalizeRound(ctx, runtimeID, true); err != nil { ctx.Logger().Error("failed to finalize round", + "runtime_id", runtimeID, "err", err, ) return fmt.Errorf("failed to finalize round: %w", err) @@ -48,7 +50,7 @@ func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, runtimeI return nil } -func rearmRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace, prevTimeout int64, nextTimeout int64) error { +func rearmRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace, round uint64, prevTimeout int64, nextTimeout int64) error { // Re-arm only if the round timeout has changed. if prevTimeout == nextTimeout { return nil @@ -56,6 +58,7 @@ func rearmRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace, prevTimeo ctx.Logger().Debug("re-arming round timeout", "runtime_id", runtimeID, + "round", round, "prev_timeout", prevTimeout, "next_timeout", nextTimeout, "height", ctx.BlockHeight()+1, // Current height is ctx.BlockHeight() + 1 diff --git a/go/consensus/cometbft/apps/roothash/transactions.go b/go/consensus/cometbft/apps/roothash/transactions.go index 5d64ea1f8ee..684742d315d 100644 --- a/go/consensus/cometbft/apps/roothash/transactions.go +++ b/go/consensus/cometbft/apps/roothash/transactions.go @@ -133,7 +133,11 @@ func (app *rootHashApplication) executorCommit( // Check if higher-ranked scheduler submitted a commitment. if prevRank != rtState.CommitmentPool.HighestRank { + round := rtState.LastBlock.Header.Round + 1 + ctx.Logger().Debug("transaction scheduler has changed", + "runtime_id", cc.ID, + "round", round, "prev_rank", prevRank, "new_rank", rtState.CommitmentPool.HighestRank, ) @@ -142,7 +146,7 @@ func (app *rootHashApplication) executorCommit( prevTimeout := rtState.NextTimeout rtState.NextTimeout = ctx.BlockHeight() + 1 + rtState.Runtime.Executor.RoundTimeout // Current height is ctx.BlockHeight() + 1 - if err := rearmRoundTimeout(ctx, cc.ID, prevTimeout, rtState.NextTimeout); err != nil { + if err := rearmRoundTimeout(ctx, cc.ID, round, prevTimeout, rtState.NextTimeout); err != nil { return err } } diff --git a/go/worker/compute/executor/committee/discrepancy.go b/go/worker/compute/executor/committee/discrepancy.go index 56eda83bef7..5107cc366f1 100644 --- a/go/worker/compute/executor/committee/discrepancy.go +++ b/go/worker/compute/executor/committee/discrepancy.go @@ -8,15 +8,17 @@ import ( ) type discrepancyEvent struct { - rank uint64 height uint64 + rank uint64 + timeout bool authoritative bool } func (n *Node) handleDiscrepancy(ctx context.Context, ev *discrepancyEvent) { n.logger.Warn("execution discrepancy detected", - "rank", ev.rank, "height", ev.height, + "rank", ev.rank, + "timeout", ev.timeout, "authoritative", ev.authoritative, ) @@ -81,8 +83,9 @@ func (n *Node) predictDiscrepancy(ctx context.Context, ec *commitment.ExecutorCo n.logger.Warn("observed commitments indicate discrepancy") n.handleDiscrepancy(ctx, &discrepancyEvent{ - rank: n.commitPool.HighestRank, height: uint64(n.blockInfo.ConsensusBlock.Height), + rank: n.commitPool.HighestRank, + timeout: false, authoritative: false, }) } diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index ca42bdb8363..c3abf37eaa9 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -1261,8 +1261,9 @@ func (n *Node) handleEvent(ctx context.Context, ev *roothash.Event) { switch { case ev.ExecutionDiscrepancyDetected != nil: n.handleDiscrepancy(ctx, &discrepancyEvent{ - rank: ev.ExecutionDiscrepancyDetected.Rank, height: uint64(ev.Height), + rank: ev.ExecutionDiscrepancyDetected.Rank, + timeout: ev.ExecutionDiscrepancyDetected.Timeout, authoritative: true, }) case ev.ExecutorCommitted != nil: diff --git a/go/worker/compute/executor/committee/p2p.go b/go/worker/compute/executor/committee/p2p.go index 04443b6eb11..5e7c913ef66 100644 --- a/go/worker/compute/executor/committee/p2p.go +++ b/go/worker/compute/executor/committee/p2p.go @@ -86,6 +86,8 @@ func (h *committeeMsgHandler) HandleMessage(_ context.Context, _ signature.Publi } h.n.logger.Debug("received a proposal", + "runtime_id", h.n.commonNode.Runtime.ID(), + "round", proposal.Header.Round, "node_id", proposal.NodeID, "rank", rank, "batch_size", len(proposal.Batch), From d6ae93321b9d8a5023c724ce249e72544af200a9 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Mon, 6 Nov 2023 10:16:44 +0100 Subject: [PATCH 02/12] go/worker/common/committee/group: Remove unused epoch and round ctx --- go/worker/common/committee/group.go | 28 ---------------------------- 1 file changed, 28 deletions(-) diff --git a/go/worker/common/committee/group.go b/go/worker/common/committee/group.go index 1633e6c5df2..4279cb85ddb 100644 --- a/go/worker/common/committee/group.go +++ b/go/worker/common/committee/group.go @@ -52,11 +52,6 @@ func (ci *CommitteeInfo) HasRole(role scheduler.Role) bool { } type epoch struct { - epochCtx context.Context - cancelEpochCtx context.CancelFunc - roundCtx context.Context - cancelRoundCtx context.CancelFunc - // epochNumber is the sequential number of the epoch. epochNumber beacon.EpochTime // epochHeight is the height at which the epoch transition happened. @@ -154,7 +149,6 @@ func (e *EpochSnapshot) Node(_ context.Context, id signature.PublicKey) (*node.N type Group struct { sync.RWMutex - ctx context.Context identity *identity.Identity runtime runtimeRegistry.Runtime @@ -176,12 +170,6 @@ func (g *Group) RoundTransition() { if g.activeEpoch == nil { return } - - (g.activeEpoch.cancelRoundCtx)() - - ctx, cancel := context.WithCancel(g.activeEpoch.epochCtx) - g.activeEpoch.roundCtx = ctx - g.activeEpoch.cancelRoundCtx = cancel } // Suspend processes a runtime suspension that just happened. @@ -195,8 +183,6 @@ func (g *Group) Suspend() { return } - // Cancel context for the previous epoch. - (g.activeEpoch.cancelEpochCtx)() // Invalidate current epoch. g.activeEpoch = nil } @@ -206,11 +192,6 @@ func (g *Group) EpochTransition(ctx context.Context, height int64) error { g.Lock() defer g.Unlock() - // Cancel context for the previous epoch. - if g.activeEpoch != nil { - (g.activeEpoch.cancelEpochCtx)() - } - // Invalidate current epoch. In case we cannot process this transition, // this should cause the node to transition into NotReady and stay there // until the next epoch transition. @@ -295,18 +276,10 @@ func (g *Group) EpochTransition(ctx context.Context, height int64) error { // Freeze the committee. g.nodes.Freeze(height) - // Create a new epoch and round contexts. - epochCtx, cancelEpochCtx := context.WithCancel(ctx) - roundCtx, cancelRoundCtx := context.WithCancel(epochCtx) - // Update the current epoch. g.activeEpoch = &epoch{ epochNumber: epochNumber, epochHeight: epochHeight, - epochCtx: epochCtx, - cancelEpochCtx: cancelEpochCtx, - roundCtx: roundCtx, - cancelRoundCtx: cancelRoundCtx, executorCommittee: executorCommittee, runtime: runtime, } @@ -358,7 +331,6 @@ func NewGroup( } return &Group{ - ctx: ctx, identity: identity, runtime: runtime, consensus: consensus, From fdbffdb37bae96e0def51e794a6ccfdb23e4bee0 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Fri, 3 Nov 2023 13:57:34 +0100 Subject: [PATCH 03/12] go/runtime/host/sgx: Extend periodic re-attestation interval The periodic re-attestation happens every hour, while explicitly requested re-attestation takes place approximately every 1 to 2 hours. This results in both re-attestations occurring closely together. To avoid unnecessary re-attestation and re-registration, we extend the periodic re-attestation interval to encompass the worst-case scenario of explicit attestation. --- go/runtime/host/sgx/sgx.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/runtime/host/sgx/sgx.go b/go/runtime/host/sgx/sgx.go index 34aa99fbaa5..5e15158ed58 100644 --- a/go/runtime/host/sgx/sgx.go +++ b/go/runtime/host/sgx/sgx.go @@ -45,7 +45,7 @@ const ( // nodes on a single machine, all sharing the same EPC. runtimeRAKTimeout = 60 * time.Second // Runtime attest interval. - defaultRuntimeAttestInterval = 1 * time.Hour + defaultRuntimeAttestInterval = 2 * time.Hour ) // Config contains SGX-specific provisioner configuration options. From 0a855068050db04eec2476844a4f129152ab4c54 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Sat, 4 Nov 2023 02:07:45 +0100 Subject: [PATCH 04/12] go/runtime/txpool: Change signature of WatchCheckedTransactions --- go/runtime/txpool/txpool.go | 6 +++--- go/worker/compute/executor/committee/node.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index 858c100ffa2..60071dbc8a2 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -117,7 +117,7 @@ type TransactionPool interface { // WatchCheckedTransactions subscribes to notifications about new transactions being available // in the transaction pool for scheduling. - WatchCheckedTransactions() (pubsub.ClosableSubscription, <-chan []*PendingCheckTransaction) + WatchCheckedTransactions() (<-chan []*PendingCheckTransaction, pubsub.ClosableSubscription) // PendingCheckSize returns the number of transactions currently pending to be checked. PendingCheckSize() int @@ -406,11 +406,11 @@ func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) erro return nil } -func (t *txPool) WatchCheckedTransactions() (pubsub.ClosableSubscription, <-chan []*PendingCheckTransaction) { +func (t *txPool) WatchCheckedTransactions() (<-chan []*PendingCheckTransaction, pubsub.ClosableSubscription) { sub := t.checkTxNotifier.Subscribe() ch := make(chan []*PendingCheckTransaction) sub.Unwrap(ch) - return sub, ch + return ch, sub } func (t *txPool) PendingCheckSize() int { diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index c3abf37eaa9..641dcf35cc2 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -1437,7 +1437,7 @@ func (n *Node) worker() { ) // Subscribe to notifications of new transactions being available in the pool. - txSub, n.txCh = n.commonNode.TxPool.WatchCheckedTransactions() + n.txCh, txSub = n.commonNode.TxPool.WatchCheckedTransactions() defer txSub.Close() // Subscribe to gossiped executor commitments. From 4199369a421fb0ec99800e31091f543d7420e0ba Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Sun, 5 Nov 2023 21:12:50 +0100 Subject: [PATCH 05/12] go/runtime/txpool: Update lastRecheckRound on epoch transitions --- go/runtime/txpool/txpool.go | 25 +++++++++---------------- go/worker/common/committee/node.go | 14 ++------------ 2 files changed, 11 insertions(+), 28 deletions(-) diff --git a/go/runtime/txpool/txpool.go b/go/runtime/txpool/txpool.go index 60071dbc8a2..b5d37b4a6f0 100644 --- a/go/runtime/txpool/txpool.go +++ b/go/runtime/txpool/txpool.go @@ -110,10 +110,10 @@ type TransactionPool interface { GetKnownBatch(batch []hash.Hash) ([]*TxQueueMeta, map[hash.Hash]int) // ProcessBlock updates the last known runtime block information. - ProcessBlock(bi *runtime.BlockInfo) error + ProcessBlock(bi *runtime.BlockInfo) // ProcessIncomingMessages loads transactions from incoming messages into the pool. - ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error + ProcessIncomingMessages(inMsgs []*message.IncomingMessage) // WatchCheckedTransactions subscribes to notifications about new transactions being available // in the transaction pool for scheduling. @@ -374,36 +374,29 @@ HASH_LOOP: return txs, missingTxs } -func (t *txPool) ProcessBlock(bi *runtime.BlockInfo) error { +func (t *txPool) ProcessBlock(bi *runtime.BlockInfo) { t.blockInfoLock.Lock() defer t.blockInfoLock.Unlock() - switch { - case t.blockInfo == nil: + if t.blockInfo == nil { close(t.initCh) - fallthrough - case bi.RuntimeBlock.Header.HeaderType == block.EpochTransition: - // Force recheck on epoch transitions. - t.recheckTxCh.In() <- struct{}{} - default: } t.blockInfo = bi t.lastBlockProcessed = time.Now() - // Trigger transaction rechecks if needed. - if (bi.RuntimeBlock.Header.Round - t.lastRecheckRound) > t.cfg.RecheckInterval { + // Force transaction rechecks on epoch transitions and if needed. + isEpochTransition := bi.RuntimeBlock.Header.HeaderType == block.EpochTransition + roundDifference := bi.RuntimeBlock.Header.Round - t.lastRecheckRound + if isEpochTransition || roundDifference > t.cfg.RecheckInterval { t.recheckTxCh.In() <- struct{}{} t.lastRecheckRound = bi.RuntimeBlock.Header.Round } - - return nil } -func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error { +func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) { t.rimQueue.Load(inMsgs) rimQueueSize.With(t.getMetricLabels()).Set(float64(t.rimQueue.size())) - return nil } func (t *txPool) WatchCheckedTransactions() (<-chan []*PendingCheckTransaction, pubsub.ClosableSubscription) { diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index b4f2be18a31..fb92b98219a 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -574,12 +574,7 @@ func (n *Node) handleNewBlockLocked(blk *block.Block, height int64) { return } - err = n.TxPool.ProcessBlock(bi) - if err != nil { - n.logger.Error("failed to process block in transaction pool", - "err", err, - ) - } + n.TxPool.ProcessBlock(bi) // Fetch incoming messages. inMsgs, err := n.Consensus.RootHash().GetIncomingMessageQueue(n.ctx, &roothash.InMessageQueueRequest{ @@ -594,12 +589,7 @@ func (n *Node) handleNewBlockLocked(blk *block.Block, height int64) { ) return } - err = n.TxPool.ProcessIncomingMessages(inMsgs) - if err != nil { - n.logger.Error("failed to process incoming messages in transaction pool", - "err", err, - ) - } + n.TxPool.ProcessIncomingMessages(inMsgs) for _, hooks := range n.hooks { hooks.HandleNewBlockLocked(bi) From 042dca9dd89e100e0d211e24f22553b603162a23 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Sun, 5 Nov 2023 23:52:20 +0100 Subject: [PATCH 06/12] go/worker/compute/executor: Verify epoch snapshot --- go/worker/compute/executor/committee/node.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 641dcf35cc2..bc31aae91f8 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -1531,6 +1531,13 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) { // Need to be an executor committee member. n.epoch = n.commonNode.Group.GetEpochSnapshot() + if epoch := n.epoch.GetEpochNumber(); epoch != bi.Epoch { + n.logger.Debug("skipping round, behind common worker", + "epoch", epoch, + "block_epoch", bi.Epoch, + ) + return + } if !n.epoch.IsExecutorMember() { n.logger.Debug("skipping round, not an executor member", "round", round, From 0309c44a690b6747bd9291129b3e6d50779b718c Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Sun, 5 Nov 2023 23:53:10 +0100 Subject: [PATCH 07/12] go/worker/compute/executor: Fix race condition --- go/worker/compute/executor/committee/node.go | 31 ++++++++++---------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index bc31aae91f8..1b98bc9a7b7 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -1477,22 +1477,21 @@ func (n *Node) worker() { } }() - // (Re)Start the runtime worker every time a runtime block is finalized. - var ( - wg sync.WaitGroup - bi *runtime.BlockInfo - ) + // Restart the round worker every time a runtime block is finalized. for { + var bi *runtime.BlockInfo + func() { - wg.Add(1) + var wg sync.WaitGroup defer wg.Wait() ctx, cancel := context.WithCancel(n.ctx) defer cancel() + wg.Add(1) go func() { defer wg.Done() - n.roundWorker(ctx, bi) + n.roundWorker(ctx) }() select { @@ -1501,6 +1500,9 @@ func (n *Node) worker() { } }() + // Round worker stopped, so it is safe to update the last block info. + n.blockInfo = bi + select { case <-n.stopCh: n.logger.Info("termination requested") @@ -1510,12 +1512,11 @@ func (n *Node) worker() { } } -func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) { - if bi == nil { +func (n *Node) roundWorker(ctx context.Context) { + if n.blockInfo == nil { return } - n.blockInfo = bi - round := bi.RuntimeBlock.Header.Round + 1 + round := n.blockInfo.RuntimeBlock.Header.Round + 1 n.handleRoundStarted() defer n.handleRoundEnded() @@ -1531,10 +1532,10 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) { // Need to be an executor committee member. n.epoch = n.commonNode.Group.GetEpochSnapshot() - if epoch := n.epoch.GetEpochNumber(); epoch != bi.Epoch { + if epoch := n.epoch.GetEpochNumber(); epoch != n.blockInfo.Epoch { n.logger.Debug("skipping round, behind common worker", "epoch", epoch, - "block_epoch", bi.Epoch, + "block_epoch", n.blockInfo.Epoch, ) return } @@ -1555,7 +1556,7 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) { // Fetch state and round results upfront. var err error - n.rtState, n.roundResults, err = n.getRtStateAndRoundResults(ctx, bi.ConsensusBlock.Height) + n.rtState, n.roundResults, err = n.getRtStateAndRoundResults(ctx, n.blockInfo.ConsensusBlock.Height) if err != nil { n.logger.Debug("skipping round, failed to fetch state and round results", "err", err, @@ -1565,7 +1566,7 @@ func (n *Node) roundWorker(ctx context.Context, bi *runtime.BlockInfo) { // Prepare flush timer for the primary transaction scheduler. flush := false - flushTimer := time.NewTimer(bi.ActiveDescriptor.TxnScheduler.BatchFlushTimeout) + flushTimer := time.NewTimer(n.blockInfo.ActiveDescriptor.TxnScheduler.BatchFlushTimeout) defer flushTimer.Stop() // Compute node's rank when scheduling transactions. From 057117869d64eec036c0e6757819d743a7707e51 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Mon, 6 Nov 2023 00:05:46 +0100 Subject: [PATCH 08/12] go/worker/compute/executor: Clear channels when not in the committee When a node is not a member of the committee, events, commitments, and transactions accumulate in the channels. When the node gets re-elected, it must process these channels first, potentially resulting in delayed submission of its commitments or incorrect discrepancy detection. --- .changelog/5426.bugfix.md | 1 + go/worker/compute/executor/committee/node.go | 93 +++++++++++--------- 2 files changed, 54 insertions(+), 40 deletions(-) create mode 100644 .changelog/5426.bugfix.md diff --git a/.changelog/5426.bugfix.md b/.changelog/5426.bugfix.md new file mode 100644 index 00000000000..5169c6d5843 --- /dev/null +++ b/.changelog/5426.bugfix.md @@ -0,0 +1 @@ +go/worker/compute/executor: Clear channels when not in the committee diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 1b98bc9a7b7..01fe0d89564 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -1352,51 +1352,45 @@ func (n *Node) estimatePoolRank(ctx context.Context, ec *commitment.ExecutorComm ) } -func (n *Node) handleRoundStarted() { - n.logger.Debug("starting round worker", - "round", n.blockInfo.RuntimeBlock.Header.Round+1, - ) - +func (n *Node) finalizePreviousRound() { n.logger.Info("considering the round finalized", "round", n.blockInfo.RuntimeBlock.Header.Round, "header_hash", n.blockInfo.RuntimeBlock.Header.EncodedHash(), "header_type", n.blockInfo.RuntimeBlock.Header.HeaderType, ) - if n.blockInfo.RuntimeBlock.Header.HeaderType != block.Normal { - return - } - if n.proposedBatch == nil { - return - } + if n.proposedBatch != nil && n.blockInfo.RuntimeBlock.Header.HeaderType == block.Normal { + switch n.blockInfo.RuntimeBlock.Header.IORoot.Equal(&n.proposedBatch.proposedIORoot) { + case false: + n.logger.Error("proposed batch was not finalized", + "header_io_root", n.blockInfo.RuntimeBlock.Header.IORoot, + "proposed_io_root", n.proposedBatch.proposedIORoot, + "header_type", n.blockInfo.RuntimeBlock.Header.HeaderType, + "batch_size", len(n.proposedBatch.txHashes), + ) + case true: + // Record time taken for successfully processing a batch. + batchProcessingTime.With(n.getMetricLabels()).Observe(time.Since(n.proposedBatch.batchStartTime).Seconds()) - if !n.blockInfo.RuntimeBlock.Header.IORoot.Equal(&n.proposedBatch.proposedIORoot) { - n.logger.Error("proposed batch was not finalized", - "header_io_root", n.blockInfo.RuntimeBlock.Header.IORoot, - "proposed_io_root", n.proposedBatch.proposedIORoot, - "header_type", n.blockInfo.RuntimeBlock.Header.HeaderType, - "batch_size", len(n.proposedBatch.txHashes), - ) - return - } + n.logger.Debug("removing processed batch from queue", + "batch_size", len(n.proposedBatch.txHashes), + "io_root", n.blockInfo.RuntimeBlock.Header.IORoot, + ) - // Record time taken for successfully processing a batch. - batchProcessingTime.With(n.getMetricLabels()).Observe(time.Since(n.proposedBatch.batchStartTime).Seconds()) + // Remove processed transactions from queue. + n.commonNode.TxPool.HandleTxsUsed(n.proposedBatch.txHashes) + } + } - n.logger.Debug("removing processed batch from queue", - "batch_size", len(n.proposedBatch.txHashes), - "io_root", n.blockInfo.RuntimeBlock.Header.IORoot, - ) + // Clear last proposal. + n.proposedBatch = nil - // Remove processed transactions from queue. - n.commonNode.TxPool.HandleTxsUsed(n.proposedBatch.txHashes) + // Clear proposal queue. + n.commonNode.TxPool.ClearProposedBatch() } -func (n *Node) handleRoundEnded() { - n.logger.Debug("stopping round worker", - "round", n.blockInfo.RuntimeBlock.Header.Round+1, - ) - +// resetNodeState transitions to the StateWaitingForBatch state. +func (n *Node) resetNodeState() { switch state := n.state.(type) { case StateWaitingForBatch: // Nothing to do here. @@ -1415,6 +1409,23 @@ func (n *Node) handleRoundEnded() { n.transitionState(StateWaitingForBatch{}) } +// drainChannels clears all worker's channels. +// +// This ensures that channels do not accumulate obsolete data when the round worker exits +// early due to non-membership in the executor committee or errors. +func (n *Node) drainChannels(ctx context.Context) { + for { + select { + case <-n.txCh: + case <-n.ecCh: + case <-n.evCh: + case <-n.reselectCh: + case <-ctx.Done(): + return + } + } +} + func (n *Node) worker() { defer close(n.quitCh) defer (n.cancelCtx)() @@ -1492,6 +1503,7 @@ func (n *Node) worker() { go func() { defer wg.Done() n.roundWorker(ctx) + n.drainChannels(ctx) }() select { @@ -1518,14 +1530,15 @@ func (n *Node) roundWorker(ctx context.Context) { } round := n.blockInfo.RuntimeBlock.Header.Round + 1 - n.handleRoundStarted() - defer n.handleRoundEnded() - - // Clear last proposal. - n.proposedBatch = nil + n.logger.Debug("round worker started", + "round", round, + ) + defer n.logger.Debug("round worker stopped", + "round", round, + ) - // Clear proposal queue. - n.commonNode.TxPool.ClearProposedBatch() + n.finalizePreviousRound() + defer n.resetNodeState() // Prune proposals. n.proposals.Prune(round) From 6b413fae6aeff40e8f9ffb7f762d16f3266477f2 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Mon, 6 Nov 2023 02:15:21 +0100 Subject: [PATCH 09/12] go/worker/compute/executor: Verify discrepancy events --- .../cometbft/apps/roothash/finalization.go | 6 ++++- go/roothash/api/api.go | 2 ++ go/roothash/tests/tester.go | 26 ++++++++++++------- .../compute/executor/committee/discrepancy.go | 14 ++++++++++ go/worker/compute/executor/committee/node.go | 1 + 5 files changed, 38 insertions(+), 11 deletions(-) diff --git a/go/consensus/cometbft/apps/roothash/finalization.go b/go/consensus/cometbft/apps/roothash/finalization.go index b622ea851a2..2528d7bb2c1 100644 --- a/go/consensus/cometbft/apps/roothash/finalization.go +++ b/go/consensus/cometbft/apps/roothash/finalization.go @@ -91,7 +91,11 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo ctx.EmitEvent( tmapi.NewEventBuilder(app.Name()). - TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{Rank: rtState.CommitmentPool.HighestRank, Timeout: timeout}). + TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{ + Round: round, + Rank: rtState.CommitmentPool.HighestRank, + Timeout: timeout, + }). TypedAttribute(&roothash.RuntimeIDAttribute{ID: rtState.Runtime.ID}), ) diff --git a/go/roothash/api/api.go b/go/roothash/api/api.go index c8fb331769e..ba7a2c3140e 100644 --- a/go/roothash/api/api.go +++ b/go/roothash/api/api.go @@ -425,6 +425,8 @@ func (e *ExecutorCommittedEvent) EventKind() string { // ExecutionDiscrepancyDetectedEvent is an execute discrepancy detected event. type ExecutionDiscrepancyDetectedEvent struct { + // Round is the round in which the discrepancy was detected. + Round uint64 `json:"round"` // Rank is the rank of the transaction scheduler. Rank uint64 `json:"rank"` // Timeout signals whether the discrepancy was due to a timeout. diff --git a/go/roothash/tests/tester.go b/go/roothash/tests/tester.go index 87fee767e22..44435542f71 100644 --- a/go/roothash/tests/tester.go +++ b/go/roothash/tests/tester.go @@ -51,6 +51,7 @@ type commitmentEvent struct { type discrepancyEvent struct { timeout bool rank uint64 + round uint64 } type finalizedEvent struct { @@ -418,6 +419,7 @@ func (s *runtimeState) verifyEvents(t *testing.T, ctx context.Context, backend a require.NotNil(ev.ExecutionDiscrepancyDetected, fmt.Sprintf("unexpected event: %+v", ev)) require.Equal(de.timeout, ev.ExecutionDiscrepancyDetected.Timeout, "timeout should match") require.Equal(de.rank, ev.ExecutionDiscrepancyDetected.Rank, "rank should match") + require.Equal(de.round, ev.ExecutionDiscrepancyDetected.Round, "round should match") } if fe != nil { @@ -590,7 +592,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse parent, err = nextRuntimeBlock(ch, nil) require.NoError(err, "nextRuntimeBlock") - require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round") + round := child.Block.Header.Round + 1 + require.EqualValues(round, parent.Block.Header.Round, "block round") require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed") // Check that round was finalized after 2.5*RoundTimeout blocks. @@ -599,7 +602,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse // Check that discrepancy resolution started after RoundTimeout blocks. height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10 - s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil) + s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil) // Check that the liveness statistics were computed correctly. verifyLivenessStatistics(parent) @@ -626,7 +629,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse parent, err = nextRuntimeBlock(ch, nil) require.NoError(err, "nextRuntimeBlock") - require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round") + round := child.Block.Header.Round + 1 + require.EqualValues(round, parent.Block.Header.Round, "block round") require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed") // Backup schedulers should wait for a double timeout. @@ -635,7 +639,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse // Check that round was finalized after 1.5*RoundTimeout blocks and that discrepancy // resolution started immediately. height := parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10 - s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:2]}, &discrepancyEvent{false, rank}, nil) + s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:2]}, &discrepancyEvent{false, rank, round}, nil) default: // Check that round was finalized after 2.5*RoundTimeout blocks. height := parent.Height - 25*s.rt.Runtime.Executor.RoundTimeout/10 @@ -643,7 +647,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse // Check that discrepancy resolution started after RoundTimeout blocks. height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10 - s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil) + s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil) } @@ -672,7 +676,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse parent, err = nextRuntimeBlock(ch, nil) require.NoError(err, "nextRuntimeBlock") - require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round") + round := child.Block.Header.Round + 1 + require.EqualValues(round, parent.Block.Header.Round, "block round") require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed") // Check that round was finalized after 2.5*RoundTimeout blocks. @@ -681,7 +686,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse // Check that discrepancy resolution started after RoundTimeout blocks. height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10 - s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil) + s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil) // Check that the liveness statistics were computed correctly. verifyLivenessStatistics(parent) @@ -712,7 +717,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse parent, err = nextRuntimeBlock(ch, nil) require.NoError(err, "nextRuntimeBlock") - require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round") + round := child.Block.Header.Round + 1 + require.EqualValues(round, parent.Block.Header.Round, "block round") require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed") // Backup schedulers should wait for a double timeout. @@ -721,7 +727,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse // Check that round was finalized after 1.5*RoundTimeout blocks and that discrepancy // resolution started immediately. height := parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10 - s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:3]}, &discrepancyEvent{false, rank}, nil) + s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:3]}, &discrepancyEvent{false, rank, round}, nil) default: // Check that round was finalized after 2.5*RoundTimeout blocks. height := parent.Height - 25*s.rt.Runtime.Executor.RoundTimeout/10 @@ -729,7 +735,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse // Check that discrepancy resolution started after RoundTimeout blocks. height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10 - s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil) + s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil) } diff --git a/go/worker/compute/executor/committee/discrepancy.go b/go/worker/compute/executor/committee/discrepancy.go index 5107cc366f1..cc18506de2c 100644 --- a/go/worker/compute/executor/committee/discrepancy.go +++ b/go/worker/compute/executor/committee/discrepancy.go @@ -9,14 +9,27 @@ import ( type discrepancyEvent struct { height uint64 + round uint64 rank uint64 timeout bool authoritative bool } func (n *Node) handleDiscrepancy(ctx context.Context, ev *discrepancyEvent) { + if ev.round != n.blockInfo.RuntimeBlock.Header.Round+1 { + n.logger.Debug("ignoring bad discrepancy event", + "height", ev.height, + "round", ev.round, + "rank", ev.rank, + "timeout", ev.timeout, + "authoritative", ev.authoritative, + ) + return + } + n.logger.Warn("execution discrepancy detected", "height", ev.height, + "round", ev.round, "rank", ev.rank, "timeout", ev.timeout, "authoritative", ev.authoritative, @@ -84,6 +97,7 @@ func (n *Node) predictDiscrepancy(ctx context.Context, ec *commitment.ExecutorCo n.handleDiscrepancy(ctx, &discrepancyEvent{ height: uint64(n.blockInfo.ConsensusBlock.Height), + round: n.blockInfo.RuntimeBlock.Header.Round + 1, rank: n.commitPool.HighestRank, timeout: false, authoritative: false, diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 01fe0d89564..b23f2f1a338 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -1262,6 +1262,7 @@ func (n *Node) handleEvent(ctx context.Context, ev *roothash.Event) { case ev.ExecutionDiscrepancyDetected != nil: n.handleDiscrepancy(ctx, &discrepancyEvent{ height: uint64(ev.Height), + round: ev.ExecutionDiscrepancyDetected.Round, rank: ev.ExecutionDiscrepancyDetected.Rank, timeout: ev.ExecutionDiscrepancyDetected.Timeout, authoritative: true, From 706b2cd666a3130a4f98c2fdd21619219a64709b Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Mon, 6 Nov 2023 10:24:09 +0100 Subject: [PATCH 10/12] go/worker/common/committee: Remove unused node hooks --- go/worker/client/committee/node.go | 11 ----------- go/worker/common/committee/node.go | 13 ------------- go/worker/common/committee/p2p.go | 6 ------ go/worker/compute/executor/committee/hooks.go | 14 -------------- go/worker/storage/committee/node.go | 10 ---------- 5 files changed, 54 deletions(-) diff --git a/go/worker/client/committee/node.go b/go/worker/client/committee/node.go index 145fbb842f8..49a4b45b6fa 100644 --- a/go/worker/client/committee/node.go +++ b/go/worker/client/committee/node.go @@ -77,17 +77,6 @@ func (n *Node) Initialized() <-chan struct{} { return n.initCh } -// HandlePeerTx is guarded by CrossNode. -func (n *Node) HandlePeerTx(context.Context, []byte) error { - // Nothing to do here. - return nil -} - -// HandleEpochTransitionLocked is guarded by CrossNode. -func (n *Node) HandleEpochTransitionLocked(*committee.EpochSnapshot) { - // Nothing to do here. -} - // HandleNewBlockEarlyLocked is guarded by CrossNode. func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { // Nothing to do here. diff --git a/go/worker/common/committee/node.go b/go/worker/common/committee/node.go index fb92b98219a..f33d88e2362 100644 --- a/go/worker/common/committee/node.go +++ b/go/worker/common/committee/node.go @@ -127,11 +127,6 @@ var ( // NodeHooks defines a worker's duties at common events. // These are called from the runtime's common node's worker. type NodeHooks interface { - // HandlePeerTx handles a transaction received from a (non-local) peer. - HandlePeerTx(ctx context.Context, tx []byte) error - - // Guarded by CrossNode. - HandleEpochTransitionLocked(*EpochSnapshot) // Guarded by CrossNode. HandleNewBlockEarlyLocked(*runtime.BlockInfo) // Guarded by CrossNode. @@ -365,9 +360,6 @@ func (n *Node) handleEpochTransitionLocked(height int64) { } epochNumber.With(n.getMetricLabels()).Set(float64(epoch.epochNumber)) - for _, hooks := range n.hooks { - hooks.HandleEpochTransitionLocked(epoch) - } } // Guarded by n.CrossNode. @@ -377,11 +369,6 @@ func (n *Node) handleSuspendLocked(int64) { // Suspend group. n.Group.Suspend() - epoch := n.Group.GetEpochSnapshot() - for _, hooks := range n.hooks { - hooks.HandleEpochTransitionLocked(epoch) - } - // If the runtime has been suspended, we need to switch to checking the latest registry // descriptor instead of the active one as otherwise we may miss deployment updates and never // register, keeping the runtime suspended. diff --git a/go/worker/common/committee/p2p.go b/go/worker/common/committee/p2p.go index da015a16635..a9b37e0a298 100644 --- a/go/worker/common/committee/p2p.go +++ b/go/worker/common/committee/p2p.go @@ -52,12 +52,6 @@ func (h *txMsgHandler) HandleMessage(ctx context.Context, _ signature.PublicKey, } } - // Dispatch to any transaction handlers. - for _, hooks := range h.n.hooks { - if err := hooks.HandlePeerTx(ctx, tx); err != nil { - return err - } - } return nil } diff --git a/go/worker/compute/executor/committee/hooks.go b/go/worker/compute/executor/committee/hooks.go index 017f67e4463..b4d25ded065 100644 --- a/go/worker/compute/executor/committee/hooks.go +++ b/go/worker/compute/executor/committee/hooks.go @@ -1,8 +1,6 @@ package committee import ( - "context" - "github.com/oasisprotocol/oasis-core/go/common/crash" runtime "github.com/oasisprotocol/oasis-core/go/runtime/api" "github.com/oasisprotocol/oasis-core/go/worker/common/committee" @@ -11,18 +9,6 @@ import ( // Ensure Node implements NodeHooks. var _ committee.NodeHooks = (*Node)(nil) -// HandlePeerTx implements NodeHooks. -func (n *Node) HandlePeerTx(context.Context, []byte) error { - // Nothing to do here. - return nil -} - -// HandleEpochTransitionLocked implements NodeHooks. -// Guarded by n.commonNode.CrossNode. -func (n *Node) HandleEpochTransitionLocked(*committee.EpochSnapshot) { - // Nothing to do here. -} - // HandleNewBlockEarlyLocked implements NodeHooks. // Guarded by n.commonNode.CrossNode. func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 4734431463e..974fc4de34e 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -337,16 +337,6 @@ func (n *Node) GetLocalStorage() storageApi.LocalBackend { // NodeHooks implementation. -func (n *Node) HandlePeerTx(context.Context, []byte) error { - // Nothing to do here. - return nil -} - -// HandleEpochTransitionLocked is guarded by CrossNode. -func (n *Node) HandleEpochTransitionLocked(*committee.EpochSnapshot) { - // Nothing to do here. -} - // HandleNewBlockEarlyLocked is guarded by CrossNode. func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) { // Nothing to do here. From aa1711acc0104b8b8032ce65da7e3fe2adefaf11 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Tue, 7 Nov 2023 11:21:10 +0100 Subject: [PATCH 11/12] go/worker/compute/executor: Unlock drain after rt scheduling finishes --- go/worker/compute/executor/committee/node.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index b23f2f1a338..2956ffaf469 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -395,7 +395,6 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) { // Ask the transaction pool to get a batch of transactions for us and see if we should be // proposing a new batch to other nodes. batch := n.commonNode.TxPool.GetSchedulingSuggestion(rtInfo.Features.ScheduleControl.InitialBatchSize) - defer n.commonNode.TxPool.FinishScheduling() switch { case force: // Batch flush timeout expired, schedule empty batch. @@ -412,6 +411,7 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) { default: // No need to schedule a batch. n.logger.Debug("not scheduling, no transactions") + n.commonNode.TxPool.FinishScheduling() return } @@ -431,6 +431,7 @@ func (n *Node) scheduleBatch(ctx context.Context, round uint64, force bool) { go func() { defer close(done) n.startSchedulingBatch(ctx, batch) + n.commonNode.TxPool.FinishScheduling() }() } From 626172dcaaef77fc7f89acce1e7454c105e68738 Mon Sep 17 00:00:00 2001 From: Peter Nose Date: Tue, 7 Nov 2023 11:43:32 +0100 Subject: [PATCH 12/12] go/worker/compute/executor: Defer call to time.Since --- go/worker/compute/executor/committee/node.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/worker/compute/executor/committee/node.go b/go/worker/compute/executor/committee/node.go index 2956ffaf469..12fccf9530a 100644 --- a/go/worker/compute/executor/committee/node.go +++ b/go/worker/compute/executor/committee/node.go @@ -831,7 +831,9 @@ func (n *Node) proposeBatch( // Commit I/O and state write logs to storage. storageErr := func() error { start := time.Now() - defer storageCommitLatency.With(n.getMetricLabels()).Observe(time.Since(start).Seconds()) + defer func() { + storageCommitLatency.With(n.getMetricLabels()).Observe(time.Since(start).Seconds()) + }() ctx, cancel := context.WithCancel(roundCtx) defer cancel()