Skip to content

Commit

Permalink
Merge pull request #5426 from oasisprotocol/peternose/bugfix/minor-fixes
Browse files Browse the repository at this point in the history
go/worker/compute/executor: Minor fixes
  • Loading branch information
peternose authored Nov 7, 2023
2 parents 12e86f1 + 626172d commit abbab48
Show file tree
Hide file tree
Showing 18 changed files with 166 additions and 190 deletions.
1 change: 1 addition & 0 deletions .changelog/5426.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
go/worker/compute/executor: Clear channels when not in the committee
21 changes: 17 additions & 4 deletions go/consensus/cometbft/apps/roothash/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,28 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo
switch err {
case commitment.ErrDiscrepancyDetected:
ctx.Logger().Warn("executor discrepancy detected",
"runtime_id", rtState.Runtime.ID,
"round", round,
"rank", rtState.CommitmentPool.HighestRank,
"timeout", timeout,
logging.LogEvent, roothash.LogEventExecutionDiscrepancyDetected,
)

ctx.EmitEvent(
tmapi.NewEventBuilder(app.Name()).
TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{Rank: rtState.CommitmentPool.HighestRank, Timeout: timeout}).
TypedAttribute(&roothash.ExecutionDiscrepancyDetectedEvent{
Round: round,
Rank: rtState.CommitmentPool.HighestRank,
Timeout: timeout,
}).
TypedAttribute(&roothash.RuntimeIDAttribute{ID: rtState.Runtime.ID}),
)

// Re-arm round timeout. Give backup workers enough time to submit commitments.
prevTimeout := rtState.NextTimeout
rtState.NextTimeout = ctx.BlockHeight() + 1 + (rtState.Runtime.Executor.RoundTimeout*backupWorkerTimeoutFactorNumerator)/backupWorkerTimeoutFactorDenominator // Current height is ctx.BlockHeight() + 1

if err = rearmRoundTimeout(ctx, rtState.Runtime.ID, prevTimeout, rtState.NextTimeout); err != nil {
if err = rearmRoundTimeout(ctx, rtState.Runtime.ID, round, prevTimeout, rtState.NextTimeout); err != nil {
return err
}

Expand All @@ -114,6 +120,7 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo
case commitment.ErrStillWaiting:
// Need more commits.
ctx.Logger().Debug("insufficient commitments for finality, waiting",
"runtime_id", rtState.Runtime.ID,
"round", round,
)
return nil
Expand All @@ -132,8 +139,11 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo

// The round has been finalized.
ctx.Logger().Debug("finalized round",
"runtime_id", rtState.Runtime.ID,
"round", round,
"priority", pool.HighestRank,
"rank", pool.HighestRank,
"scheduler_id", sc.Commitment.Header.SchedulerID,
"timeout", timeout,
)

livenessStats.TotalRounds++
Expand Down Expand Up @@ -231,6 +241,8 @@ func (app *rootHashApplication) tryFinalizeRoundInsideTx( //nolint: gocyclo
switch rtState.CommitmentPool.Discrepancy {
case true:
ctx.Logger().Debug("executor pool discrepancy",
"runtime_id", rtState.Runtime.ID,
"round", round,
"slashing", rtState.Runtime.Staking.Slashing,
)

Expand Down Expand Up @@ -316,7 +328,7 @@ func (app *rootHashApplication) finalizeBlock(ctx *tmapi.Context, rtState *rooth
prevTimeout := rtState.NextTimeout
rtState.NextTimeout = roothash.TimeoutNever

return rearmRoundTimeout(ctx, rtState.Runtime.ID, prevTimeout, rtState.NextTimeout)
return rearmRoundTimeout(ctx, rtState.Runtime.ID, blk.Header.Round, prevTimeout, rtState.NextTimeout)
}

func (app *rootHashApplication) failRound(
Expand All @@ -327,6 +339,7 @@ func (app *rootHashApplication) failRound(
round := rtState.LastBlock.Header.Round + 1

ctx.Logger().Debug("round failed",
"runtime_id", rtState.Runtime.ID,
"round", round,
"err", err,
logging.LogEvent, roothash.LogEventRoundFailed,
Expand Down
1 change: 1 addition & 0 deletions go/consensus/cometbft/apps/roothash/roothash.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (app *rootHashApplication) onCommitteeChanged(ctx *tmapi.Context, state *ro
ctx.Logger().Debug("updating committee for runtime",
"runtime_id", rt.ID,
"epoch", epoch,
"committee", committee,
)

// Emit an empty block signaling epoch transition. This is required so that
Expand Down
5 changes: 4 additions & 1 deletion go/consensus/cometbft/apps/roothash/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ func (app *rootHashApplication) processRoundTimeouts(ctx *tmapi.Context) error {

func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace) error {
ctx.Logger().Warn("round timeout expired, forcing finalization",
"runtime_id", runtimeID,
logging.LogEvent, roothash.LogEventTimerFired,
)

if err := app.tryFinalizeRound(ctx, runtimeID, true); err != nil {
ctx.Logger().Error("failed to finalize round",
"runtime_id", runtimeID,
"err", err,
)
return fmt.Errorf("failed to finalize round: %w", err)
Expand All @@ -48,14 +50,15 @@ func (app *rootHashApplication) processRoundTimeout(ctx *tmapi.Context, runtimeI
return nil
}

func rearmRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace, prevTimeout int64, nextTimeout int64) error {
func rearmRoundTimeout(ctx *tmapi.Context, runtimeID common.Namespace, round uint64, prevTimeout int64, nextTimeout int64) error {
// Re-arm only if the round timeout has changed.
if prevTimeout == nextTimeout {
return nil
}

ctx.Logger().Debug("re-arming round timeout",
"runtime_id", runtimeID,
"round", round,
"prev_timeout", prevTimeout,
"next_timeout", nextTimeout,
"height", ctx.BlockHeight()+1, // Current height is ctx.BlockHeight() + 1
Expand Down
6 changes: 5 additions & 1 deletion go/consensus/cometbft/apps/roothash/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ func (app *rootHashApplication) executorCommit(

// Check if higher-ranked scheduler submitted a commitment.
if prevRank != rtState.CommitmentPool.HighestRank {
round := rtState.LastBlock.Header.Round + 1

ctx.Logger().Debug("transaction scheduler has changed",
"runtime_id", cc.ID,
"round", round,
"prev_rank", prevRank,
"new_rank", rtState.CommitmentPool.HighestRank,
)
Expand All @@ -142,7 +146,7 @@ func (app *rootHashApplication) executorCommit(
prevTimeout := rtState.NextTimeout
rtState.NextTimeout = ctx.BlockHeight() + 1 + rtState.Runtime.Executor.RoundTimeout // Current height is ctx.BlockHeight() + 1

if err := rearmRoundTimeout(ctx, cc.ID, prevTimeout, rtState.NextTimeout); err != nil {
if err := rearmRoundTimeout(ctx, cc.ID, round, prevTimeout, rtState.NextTimeout); err != nil {
return err
}
}
Expand Down
2 changes: 2 additions & 0 deletions go/roothash/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ func (e *ExecutorCommittedEvent) EventKind() string {

// ExecutionDiscrepancyDetectedEvent is an execute discrepancy detected event.
type ExecutionDiscrepancyDetectedEvent struct {
// Round is the round in which the discrepancy was detected.
Round uint64 `json:"round"`
// Rank is the rank of the transaction scheduler.
Rank uint64 `json:"rank"`
// Timeout signals whether the discrepancy was due to a timeout.
Expand Down
26 changes: 16 additions & 10 deletions go/roothash/tests/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type commitmentEvent struct {
type discrepancyEvent struct {
timeout bool
rank uint64
round uint64
}

type finalizedEvent struct {
Expand Down Expand Up @@ -418,6 +419,7 @@ func (s *runtimeState) verifyEvents(t *testing.T, ctx context.Context, backend a
require.NotNil(ev.ExecutionDiscrepancyDetected, fmt.Sprintf("unexpected event: %+v", ev))
require.Equal(de.timeout, ev.ExecutionDiscrepancyDetected.Timeout, "timeout should match")
require.Equal(de.rank, ev.ExecutionDiscrepancyDetected.Rank, "rank should match")
require.Equal(de.round, ev.ExecutionDiscrepancyDetected.Round, "round should match")
}

if fe != nil {
Expand Down Expand Up @@ -590,7 +592,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
parent, err = nextRuntimeBlock(ch, nil)
require.NoError(err, "nextRuntimeBlock")

require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round")
round := child.Block.Header.Round + 1
require.EqualValues(round, parent.Block.Header.Round, "block round")
require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed")

// Check that round was finalized after 2.5*RoundTimeout blocks.
Expand All @@ -599,7 +602,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse

// Check that discrepancy resolution started after RoundTimeout blocks.
height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil)
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil)

// Check that the liveness statistics were computed correctly.
verifyLivenessStatistics(parent)
Expand All @@ -626,7 +629,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
parent, err = nextRuntimeBlock(ch, nil)
require.NoError(err, "nextRuntimeBlock")

require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round")
round := child.Block.Header.Round + 1
require.EqualValues(round, parent.Block.Header.Round, "block round")
require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed")

// Backup schedulers should wait for a double timeout.
Expand All @@ -635,15 +639,15 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
// Check that round was finalized after 1.5*RoundTimeout blocks and that discrepancy
// resolution started immediately.
height := parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:2]}, &discrepancyEvent{false, rank}, nil)
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:2]}, &discrepancyEvent{false, rank, round}, nil)
default:
// Check that round was finalized after 2.5*RoundTimeout blocks.
height := parent.Height - 25*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:2]}, nil, nil)

// Check that discrepancy resolution started after RoundTimeout blocks.
height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil)
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil)

}

Expand Down Expand Up @@ -672,7 +676,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
parent, err = nextRuntimeBlock(ch, nil)
require.NoError(err, "nextRuntimeBlock")

require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round")
round := child.Block.Header.Round + 1
require.EqualValues(round, parent.Block.Header.Round, "block round")
require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed")

// Check that round was finalized after 2.5*RoundTimeout blocks.
Expand All @@ -681,7 +686,7 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse

// Check that discrepancy resolution started after RoundTimeout blocks.
height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil)
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil)

// Check that the liveness statistics were computed correctly.
verifyLivenessStatistics(parent)
Expand Down Expand Up @@ -712,7 +717,8 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
parent, err = nextRuntimeBlock(ch, nil)
require.NoError(err, "nextRuntimeBlock")

require.EqualValues(child.Block.Header.Round+1, parent.Block.Header.Round, "block round")
round := child.Block.Header.Round + 1
require.EqualValues(round, parent.Block.Header.Round, "block round")
require.EqualValues(block.RoundFailed, parent.Block.Header.HeaderType, "block header type must be RoundFailed")

// Backup schedulers should wait for a double timeout.
Expand All @@ -721,15 +727,15 @@ func (s *runtimeState) testRoundTimeout(t *testing.T, backend api.Backend, conse
// Check that round was finalized after 1.5*RoundTimeout blocks and that discrepancy
// resolution started immediately.
height := parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:3]}, &discrepancyEvent{false, rank}, nil)
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:3]}, &discrepancyEvent{false, rank, round}, nil)
default:
// Check that round was finalized after 2.5*RoundTimeout blocks.
height := parent.Height - 25*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, &commitmentEvent{executorCommits[:3]}, nil, nil)

// Check that discrepancy resolution started after RoundTimeout blocks.
height = parent.Height - 15*s.rt.Runtime.Executor.RoundTimeout/10
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank}, nil)
s.verifyEvents(t, ctx, backend, height, nil, &discrepancyEvent{true, rank, round}, nil)

}

Expand Down
2 changes: 1 addition & 1 deletion go/runtime/host/sgx/sgx.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const (
// nodes on a single machine, all sharing the same EPC.
runtimeRAKTimeout = 60 * time.Second
// Runtime attest interval.
defaultRuntimeAttestInterval = 1 * time.Hour
defaultRuntimeAttestInterval = 2 * time.Hour
)

// Config contains SGX-specific provisioner configuration options.
Expand Down
31 changes: 12 additions & 19 deletions go/runtime/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,14 @@ type TransactionPool interface {
GetKnownBatch(batch []hash.Hash) ([]*TxQueueMeta, map[hash.Hash]int)

// ProcessBlock updates the last known runtime block information.
ProcessBlock(bi *runtime.BlockInfo) error
ProcessBlock(bi *runtime.BlockInfo)

// ProcessIncomingMessages loads transactions from incoming messages into the pool.
ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error
ProcessIncomingMessages(inMsgs []*message.IncomingMessage)

// WatchCheckedTransactions subscribes to notifications about new transactions being available
// in the transaction pool for scheduling.
WatchCheckedTransactions() (pubsub.ClosableSubscription, <-chan []*PendingCheckTransaction)
WatchCheckedTransactions() (<-chan []*PendingCheckTransaction, pubsub.ClosableSubscription)

// PendingCheckSize returns the number of transactions currently pending to be checked.
PendingCheckSize() int
Expand Down Expand Up @@ -374,43 +374,36 @@ HASH_LOOP:
return txs, missingTxs
}

func (t *txPool) ProcessBlock(bi *runtime.BlockInfo) error {
func (t *txPool) ProcessBlock(bi *runtime.BlockInfo) {
t.blockInfoLock.Lock()
defer t.blockInfoLock.Unlock()

switch {
case t.blockInfo == nil:
if t.blockInfo == nil {
close(t.initCh)
fallthrough
case bi.RuntimeBlock.Header.HeaderType == block.EpochTransition:
// Force recheck on epoch transitions.
t.recheckTxCh.In() <- struct{}{}
default:
}

t.blockInfo = bi
t.lastBlockProcessed = time.Now()

// Trigger transaction rechecks if needed.
if (bi.RuntimeBlock.Header.Round - t.lastRecheckRound) > t.cfg.RecheckInterval {
// Force transaction rechecks on epoch transitions and if needed.
isEpochTransition := bi.RuntimeBlock.Header.HeaderType == block.EpochTransition
roundDifference := bi.RuntimeBlock.Header.Round - t.lastRecheckRound
if isEpochTransition || roundDifference > t.cfg.RecheckInterval {
t.recheckTxCh.In() <- struct{}{}
t.lastRecheckRound = bi.RuntimeBlock.Header.Round
}

return nil
}

func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) error {
func (t *txPool) ProcessIncomingMessages(inMsgs []*message.IncomingMessage) {
t.rimQueue.Load(inMsgs)
rimQueueSize.With(t.getMetricLabels()).Set(float64(t.rimQueue.size()))
return nil
}

func (t *txPool) WatchCheckedTransactions() (pubsub.ClosableSubscription, <-chan []*PendingCheckTransaction) {
func (t *txPool) WatchCheckedTransactions() (<-chan []*PendingCheckTransaction, pubsub.ClosableSubscription) {
sub := t.checkTxNotifier.Subscribe()
ch := make(chan []*PendingCheckTransaction)
sub.Unwrap(ch)
return sub, ch
return ch, sub
}

func (t *txPool) PendingCheckSize() int {
Expand Down
11 changes: 0 additions & 11 deletions go/worker/client/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,6 @@ func (n *Node) Initialized() <-chan struct{} {
return n.initCh
}

// HandlePeerTx is guarded by CrossNode.
func (n *Node) HandlePeerTx(context.Context, []byte) error {
// Nothing to do here.
return nil
}

// HandleEpochTransitionLocked is guarded by CrossNode.
func (n *Node) HandleEpochTransitionLocked(*committee.EpochSnapshot) {
// Nothing to do here.
}

// HandleNewBlockEarlyLocked is guarded by CrossNode.
func (n *Node) HandleNewBlockEarlyLocked(*runtime.BlockInfo) {
// Nothing to do here.
Expand Down
Loading

0 comments on commit abbab48

Please sign in to comment.